Skip to content

✨(storage) implement tiered storage#486

Open
sylvinus wants to merge 18 commits into
mainfrom
object_storage
Open

✨(storage) implement tiered storage#486
sylvinus wants to merge 18 commits into
mainfrom
object_storage

Conversation

@sylvinus
Copy link
Copy Markdown
Member

@sylvinus sylvinus commented Jan 16, 2026

This allows to use S3-compatible object storage to offload blobs, making Postgres much lighter. We design for storing ~1B emails on a single instance.

Fixes #185.

Summary by CodeRabbit

Release Notes

  • New Features

    • Blob offloading to S3-compatible object storage with automatic hourly scheduling based on age and size thresholds
    • Optional AES-256-GCM encryption for blob data at rest with configurable key rotation
    • Automated garbage collection system for unreferenced blobs
    • Enhanced admin interface displaying blob storage location and encryption key assignment
  • Documentation

    • New tiered storage architecture guide and deployment configuration instructions

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jan 16, 2026

📝 Walkthrough

Walkthrough

Adds S3-backed tiered blob storage with AES-GCM encryption, schema/migration, GC with reservations, offload/verify/restore tooling, API/MDA refactors, admin updates, Redis-only coalescer, importer Range reads, settings/env/docs/CI updates, Celery beat, and extensive tests.

Changes

Tiered Blob Storage, Encryption, GC, and Integrations

Layer / File(s) Summary
Data contracts and schema
src/backend/core/enums.py, src/backend/core/migrations/..., src/backend/core/models.py
Adds compression parser and storage enum; migrates Blob for storage_location/encryption_key_id and nullable raw_content; introduces MailboxBlob; sets PROTECT FKs and partial index.
Core storage/encryption and GC services
src/backend/core/services/tiered_storage.py, src/backend/core/services/blob_gc.py
Implements AEAD encrypt/decrypt, upload/download, rotate/restore, orphan cleanup; Redis candidate set, GC draining, and upload reservation helpers.
Signals
src/backend/core/signals.py
Post-delete handlers schedule related blobs for GC; removes pre-delete eager deletion.
Model logic
src/backend/core/models.py, src/backend/core/utils.py
create_blob now dedups, compresses, encrypts; adds is_referenced/user_can_access; updates get_content; JSONValue.to_python hardens error handling.
API, serializers, metrics, MDA
src/backend/core/api/..., src/backend/core/mda/..., src/backend/core/factories.py
Upload size-cap and reservation flow; GC scheduling on template updates; metrics draft path; MDA compose-and-sign, atomic Blob+Message, attachment provenance.
Tasks and commands
src/backend/core/services/tiered_storage_tasks.py, .../management/commands/*
Hourly offload loop and per-blob worker; re_store_blobs rotation/restore; verify_blobs audit; delete_orphan_attachments.
Search/importer
src/backend/core/services/search/*, src/backend/core/services/importer/eml_tasks.py
Coalescer now Redis-only; search flag check tightened; importer uses bounded S3 Range read.
Admin/UI
src/backend/core/admin.py, src/backend/core/templates/.../blob/change_form.html
Admin restricted to superusers; Blob listing adds storage/encryption; adds download action.
Settings/docs/CI
src/backend/messages/settings.py, env.d/..., docs/*, .github/workflows/messages.yml, Makefile, src/backend/messages/celery_app.py, src/backend/core/apps.py, src/backend/core/checks.py
Adds message-blobs storage and blob settings; system checks; tiered-storage docs; env defaults; CI bucket creation; beat schedule; app imports checks.
Tests/fixtures/markers
src/backend/core/tests/**, src/backend/pyproject.toml
Adds Redis/bucket fixtures and marker; comprehensive tests for storage, GC, tasks, commands, API/MDA, metrics, search, and signals; adjusts factories usage.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant API
  participant DB
  participant GC as GC/Reservations
  participant Tiered as TieredStorageService
  participant S3 as Object Storage

  Client->>API: POST /blob/upload
  API->>GC: upload_and_reserve_blob(mailbox, content)
  GC->>DB: Create Blob + MailboxBlob (atomic)
  API-->>Client: blobId,size,sha256

  Note over API,Tiered: Later (hourly offload)
  API->>Tiered: offload_one_blob(id)
  Tiered->>S3: PUT blobs/{key_id}/{shard}/{sha}
  Tiered->>DB: Update location/encryption, clear raw_content

  Client->>API: GET /blob/{id}
  API->>DB: user_can_access(user, blob.id)
  alt Offloaded
    API->>Tiered: download_blob(id)
    Tiered->>S3: GET object
    Tiered-->>API: decrypted bytes
  else In Postgres
    API->>DB: read raw_content and decrypt
  end
  API-->>Client: content
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested reviewers

  • jbpenrath
  • sdemagny
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 466-495: The current flow writes the newly encrypted object via
self.service.storage.save(storage_key, ...) before updating
blob.encryption_key_id inside transaction.atomic(), risking storage/DB
inconsistency if the DB update fails; instead, write the new encrypted bytes to
a temporary object (e.g. derive a temp key from storage_key and new_key_id)
using self.service.storage.save(temp_key, ContentFile(encrypted)), then perform
the DB update inside transaction.atomic() (update blob.encryption_key_id and
save), and only after the transaction succeeds atomically remove/rename the temp
object to the final storage_key (or copy temp→final and delete temp) so storage
and DB remain consistent; reference symbols: self.service.storage.save,
storage_key, temp_key (create), self.service.encrypt, blob.encryption_key_id,
transaction.atomic.

In `@src/backend/core/services/tiered_storage.py`:
- Around line 31-43: In __init__, the enabled gate currently checks for an
OPTIONS.endpoint_url which wrongly disables valid S3 setups; instead set
self.enabled based on presence of the "message-blobs" storage config itself
(e.g. check that settings.STORAGES contains a non-empty "message-blobs" entry).
Update the assignment to self.enabled to use
settings.STORAGES.get("message-blobs") (or "message-blobs" in settings.STORAGES
and truthy) rather than digging for OPTIONS.endpoint_url so AWS S3 configs
without endpoint_url remain enabled.
🧹 Nitpick comments (3)
src/backend/core/services/tiered_storage_tasks.py (1)

68-133: Consider adding retry for transient failures.

The task handles lock contention gracefully by returning "locked" status, but transient failures (network issues, temporary S3 unavailability) at line 131 are logged and returned as errors without retry. The periodic offload_blobs_task will eventually re-queue these blobs, but adding explicit retry behavior for transient exceptions (e.g., ConnectionError, Timeout) could improve reliability.

💡 Optional: Add retry for transient failures
-@celery_app.task(bind=True)
+@celery_app.task(bind=True, autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, max_retries=3)
 def offload_single_blob_task(self, blob_id: str) -> Dict[str, Any]:
src/backend/core/models.py (1)

1536-1557: Enforce storage_location/raw_content invariants at the DB layer.
With raw_content now nullable, inconsistent states (e.g., OBJECT_STORAGE + non-null content)
become possible and will surface as runtime errors in get_content. A check constraint makes
the invariant explicit and avoids silent drift. This will require a migration.

♻️ Proposed constraint
         constraints = [
             models.CheckConstraint(
                 check=(
                     models.Q(mailbox__isnull=False) | models.Q(maildomain__isnull=False)
                 ),
                 name="blob_has_owner",
             ),
+            models.CheckConstraint(
+                check=(
+                    models.Q(
+                        storage_location=BlobStorageLocationChoices.POSTGRES,
+                        raw_content__isnull=False,
+                    )
+                    | models.Q(
+                        storage_location=BlobStorageLocationChoices.OBJECT_STORAGE,
+                        raw_content__isnull=True,
+                    )
+                ),
+                name="blob_storage_location_matches_content",
+            ),
         ]

As per coding guidelines, enforce data integrity with model constraints.

Also applies to: 1583-1589

src/backend/core/services/tiered_storage.py (1)

244-281: Guard against orphan-delete races and capture delete errors.
There’s a TOCTOU window between the reference count (Line 259-263) and deletion (Line 274-275);
a concurrent offload could add a reference after the count and still have its object deleted.
Consider an advisory lock keyed by SHA256 or a transactional guard around the check+delete.

Also, capture the storage deletion exception to Sentry so cleanup failures are observable.

♻️ Suggested Sentry capture
 from cryptography.fernet import Fernet
+from sentry_sdk import capture_exception
@@
-        except Exception as e:  # pylint: disable=broad-except
-            logger.warning("Failed to delete blob from storage %s: %s", key, e)
+        except Exception as exc:  # pylint: disable=broad-except
+            capture_exception(exc)
+            logger.warning("Failed to delete blob from storage %s: %s", key, exc)
             return False

As per coding guidelines, capture and report exceptions to Sentry.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ec4c7ef and 8858822.

📒 Files selected for processing (22)
  • compose.yaml
  • env.d/development/backend.defaults
  • src/backend/core/api/viewsets/config.py
  • src/backend/core/enums.py
  • src/backend/core/management/commands/verify_tiered_storage.py
  • src/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py
  • src/backend/core/models.py
  • src/backend/core/services/search/search.py
  • src/backend/core/services/tiered_storage.py
  • src/backend/core/services/tiered_storage_tasks.py
  • src/backend/core/signals.py
  • src/backend/core/tests/commands/__init__.py
  • src/backend/core/tests/commands/test_verify_tiered_storage.py
  • src/backend/core/tests/conftest.py
  • src/backend/core/tests/services/__init__.py
  • src/backend/core/tests/services/test_tiered_storage.py
  • src/backend/core/tests/tasks/__init__.py
  • src/backend/core/tests/tasks/test_task_send_message.py
  • src/backend/core/tests/tasks/test_tiered_storage_tasks.py
  • src/backend/core/utils.py
  • src/backend/messages/celery_app.py
  • src/backend/messages/settings.py
🧰 Additional context used
📓 Path-based instructions (6)
src/backend/**/*.py

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

src/backend/**/*.py: Follow Django/PEP 8 style with a 100-character line limit
Use descriptive, snake_case names for variables and functions
Use Django ORM for database access; avoid raw SQL unless necessary for performance
Use Django’s built-in user model and authentication framework
Prefer try-except blocks to handle exceptions in business logic and views
Log expected and unexpected actions with appropriate log levels
Capture and report exceptions to Sentry; use capture_exception() for custom errors
Do not log sensitive information (tokens, passwords, financial/health data, PII)

Files:

  • src/backend/messages/celery_app.py
  • src/backend/core/tests/tasks/__init__.py
  • src/backend/core/api/viewsets/config.py
  • src/backend/core/tests/commands/test_verify_tiered_storage.py
  • src/backend/core/services/search/search.py
  • src/backend/core/management/commands/verify_tiered_storage.py
  • src/backend/core/utils.py
  • src/backend/core/signals.py
  • src/backend/core/services/tiered_storage_tasks.py
  • src/backend/core/tests/services/test_tiered_storage.py
  • src/backend/core/tests/tasks/test_task_send_message.py
  • src/backend/messages/settings.py
  • src/backend/core/tests/conftest.py
  • src/backend/core/models.py
  • src/backend/core/tests/services/__init__.py
  • src/backend/core/enums.py
  • src/backend/core/services/tiered_storage.py
  • src/backend/core/tests/commands/__init__.py
  • src/backend/core/tests/tasks/test_tiered_storage_tasks.py
  • src/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py
src/backend/**/{tests.py,tests/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

src/backend/**/{tests.py,tests/**/*.py}: Use Django’s testing tools (pytest-django) to ensure code quality and reliability
Unit tests should focus on a single use case, keep assertions minimal, and cover all possible cases

Files:

  • src/backend/core/tests/tasks/__init__.py
  • src/backend/core/tests/commands/test_verify_tiered_storage.py
  • src/backend/core/tests/services/test_tiered_storage.py
  • src/backend/core/tests/tasks/test_task_send_message.py
  • src/backend/core/tests/conftest.py
  • src/backend/core/tests/services/__init__.py
  • src/backend/core/tests/commands/__init__.py
  • src/backend/core/tests/tasks/test_tiered_storage_tasks.py
src/backend/**/{settings.py,middleware.py}

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

Use middleware judiciously for cross-cutting concerns (authentication, logging, caching)

Files:

  • src/backend/messages/settings.py
src/backend/**/settings.py

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

src/backend/**/settings.py: Leverage Django’s caching framework (e.g., Redis/Memcached) where appropriate
Use Django’s cache framework with a backend like Redis or Memcached to reduce DB load
Optimize static file handling using Django’s staticfiles pipeline (e.g., WhiteNoise)

Files:

  • src/backend/messages/settings.py
src/backend/**/{models.py,forms.py,views.py}

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

Keep business logic in models and forms; keep views thin and focused on request handling

Files:

  • src/backend/core/models.py
src/backend/**/{models.py,migrations/**/*.py}

📄 CodeRabbit inference engine (.cursor/rules/django-python.mdc)

Implement database indexing and query optimization (Model Meta indexes, constraints)

Files:

  • src/backend/core/models.py
  • src/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py
🧠 Learnings (1)
📚 Learning: 2025-09-02T10:12:12.835Z
Learnt from: CR
Repo: suitenumerique/messages PR: 0
File: .cursor/rules/django-python.mdc:0-0
Timestamp: 2025-09-02T10:12:12.835Z
Learning: Applies to src/backend/**/{tests.py,tests/**/*.py} : Use Django’s testing tools (pytest-django) to ensure code quality and reliability

Applied to files:

  • src/backend/core/tests/conftest.py
  • src/backend/core/tests/commands/__init__.py
🧬 Code graph analysis (3)
src/backend/messages/settings.py (1)
src/backend/core/utils.py (1)
  • JSONValue (8-22)
src/backend/core/tests/conftest.py (1)
src/backend/core/services/tiered_storage.py (1)
  • storage (45-49)
src/backend/core/models.py (2)
src/backend/core/enums.py (2)
  • BlobStorageLocationChoices (61-65)
  • CompressionTypeChoices (54-58)
src/backend/core/services/tiered_storage.py (6)
  • TieredStorageService (28-296)
  • encrypt (68-91)
  • compute_storage_key (52-66)
  • decrypt (93-117)
  • download_blob (208-242)
  • delete_if_orphaned (244-280)
🪛 Ruff (0.14.11)
src/backend/core/tests/commands/test_verify_tiered_storage.py

31-31: import should be at the top-level of a file

(PLC0415)


180-180: import should be at the top-level of a file

(PLC0415)


208-208: import should be at the top-level of a file

(PLC0415)


275-275: import should be at the top-level of a file

(PLC0415)


357-357: import should be at the top-level of a file

(PLC0415)


397-397: import should be at the top-level of a file

(PLC0415)


415-415: import should be at the top-level of a file

(PLC0415)


442-442: import should be at the top-level of a file

(PLC0415)


470-470: import should be at the top-level of a file

(PLC0415)


506-506: import should be at the top-level of a file

(PLC0415)


552-552: import should be at the top-level of a file

(PLC0415)


596-596: import should be at the top-level of a file

(PLC0415)


630-630: import should be at the top-level of a file

(PLC0415)

src/backend/core/management/commands/verify_tiered_storage.py

280-280: import should be at the top-level of a file

(PLC0415)


282-282: import should be at the top-level of a file

(PLC0415)

src/backend/core/tests/services/test_tiered_storage.py

114-114: import should be at the top-level of a file

(PLC0415)


129-129: import should be at the top-level of a file

(PLC0415)


271-271: import should be at the top-level of a file

(PLC0415)


296-296: import should be at the top-level of a file

(PLC0415)


362-362: import should be at the top-level of a file

(PLC0415)


396-396: import should be at the top-level of a file

(PLC0415)


571-571: import should be at the top-level of a file

(PLC0415)


614-614: import should be at the top-level of a file

(PLC0415)


654-654: import should be at the top-level of a file

(PLC0415)

src/backend/core/tests/conftest.py

22-22: import should be at the top-level of a file

(PLC0415)

src/backend/core/models.py

1474-1474: import should be at the top-level of a file

(PLC0415)


1625-1625: import should be at the top-level of a file

(PLC0415)


1656-1656: import should be at the top-level of a file

(PLC0415)

src/backend/core/services/tiered_storage.py

132-132: import should be at the top-level of a file

(PLC0415)


149-149: import should be at the top-level of a file

(PLC0415)


257-257: import should be at the top-level of a file

(PLC0415)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
  • GitHub Check: test-e2e
  • GitHub Check: lint-back
  • GitHub Check: build-front
  • GitHub Check: test-front
  • GitHub Check: check-api-state
  • GitHub Check: lint-front
  • GitHub Check: test-back
🔇 Additional comments (52)
src/backend/core/tests/tasks/__init__.py (1)

1-1: LGTM!

Clean package initializer for the tasks test module.

src/backend/core/tests/commands/__init__.py (1)

1-1: LGTM!

Appropriate package initializer for management command tests.

src/backend/core/tests/services/__init__.py (1)

1-1: LGTM!

Appropriate package initializer for service tests.

src/backend/core/tests/tasks/test_task_send_message.py (3)

3-3: LGTM!

Appropriate pylint disables for test file patterns: no-value-for-parameter for Celery task invocations without bound arguments, and unused-argument for pytest fixtures that establish state but aren't directly referenced.


45-45: Good hygiene.

Using _mailbox prefix correctly signals the variable is intentionally unused while still unpacking the fixture tuple.


99-99: Consistent with the pattern above.

src/backend/core/api/viewsets/config.py (1)

139-140: LGTM!

Using getattr(settings, setting, None) ensures consistent API response schema where all keys are always present, aligning with the OpenAPI specification that marks these fields as required. This is cleaner than conditional inclusion and provides predictable behavior for frontend consumers.

src/backend/core/utils.py (1)

14-22: LGTM!

Returning None for empty/whitespace strings allows django-configurations to fall back to default values, which is appropriate for optional JSON configuration like encryption keys.

env.d/development/backend.defaults (1)

54-66: LGTM!

The development defaults are well-documented with clear comments. Tiered storage is configured with a 3-day offload threshold and credentials follow the existing pattern for msg-imports.

compose.yaml (1)

84-84: LGTM!

The msg-blobs bucket creation follows the existing pattern. Correctly omits the ILM expiration rules since blobs are intended for long-term storage unlike temporary imports.

src/backend/messages/celery_app.py (1)

48-52: LGTM! The new beat schedule entry follows the existing pattern and an hourly interval is appropriate for blob offloading. The core.services.tiered_storage_tasks.offload_blobs_task is properly defined with the @celery_app.task(bind=True) decorator and has comprehensive test coverage.

src/backend/core/services/search/search.py (1)

40-42: No action required; direct access to settings.OPENSEARCH_INDEX_THREADS is safe.

The setting is properly defined in src/backend/messages/settings.py as a BooleanValue with a default of True. It will always exist at runtime, and direct access does not risk AttributeError. This pattern is already used consistently throughout the codebase in multiple other files (src/backend/core/signals.py, src/backend/core/services/search/tasks.py).

Likely an incorrect or invalid review comment.

src/backend/core/enums.py (1)

61-66: LGTM!

The new BlobStorageLocationChoices enum follows the existing patterns in this file with appropriate integer values and descriptive labels. Good placement in the logical order of the file.

src/backend/core/tests/conftest.py (1)

14-45: LGTM!

The session-scoped fixture properly defers Django imports and handles missing storage configuration gracefully. The broad exception handling is appropriate here to prevent test setup failures when object storage isn't configured. The pylint: disable comment at line 3 correctly covers the in-function import pattern.

src/backend/core/services/tiered_storage_tasks.py (1)

26-65: LGTM!

The task efficiently streams eligible blob IDs using values_list with iterator() to avoid memory pressure. The filtering criteria (age threshold + minimum size) appropriately limit the scope of each run.

src/backend/core/tests/commands/test_verify_tiered_storage.py (3)

1-7: LGTM!

Comprehensive test coverage for the verify_tiered_storage management command. The tests appropriately cover disabled states, E2E verification modes, hash verification with corruption detection, and re-encryption workflows. The in-function imports are intentionally used for test isolation and the pylint: disable comment at line 7 correctly covers this pattern.


78-80: Good cleanup pattern.

Consistent use of try/finally with existence checks ensures test isolation and prevents storage pollution across test runs.


468-534: Thorough E2E test for object storage re-encryption.

This test covers the complete workflow: encrypting with old key, uploading to storage, rotating keys, re-encrypting, and verifying content integrity via download and decompression.

src/backend/core/signals.py (1)

51-52: The concern is unfounded. OPENSEARCH_INDEX_THREADS is always defined in settings with a default value (True in src/backend/messages/settings.py:83-84 and False as a fallback at line 1090). Direct attribute access to settings.OPENSEARCH_INDEX_THREADS is safe and will not raise AttributeError. The change from getattr() to direct access is a valid simplification that removes unnecessary defensive programming.

Likely an incorrect or invalid review comment.

src/backend/core/migrations/0014_blob_encryption_key_id_blob_storage_location_and_more.py (1)

12-27: Migration structure looks correct.

The migration properly:

  • Adds storage_location with db_index=True for efficient filtering
  • Makes raw_content nullable to support object storage blobs
  • Uses hardcoded choices in migration (correct Django practice)

One consideration: if key rotation queries will frequently filter by encryption_key_id (e.g., finding all blobs encrypted with a specific key), adding an index on that field could be beneficial. However, this can be deferred based on actual query patterns.

src/backend/core/tests/services/test_tiered_storage.py (5)

24-181: Comprehensive unit test coverage for encryption/decryption.

The unit tests thoroughly cover:

  • Storage key computation with different SHA256 prefixes
  • Encryption passthrough when disabled (key_id=0)
  • Proper error handling for invalid/missing keys and corrupted data
  • Key rotation scenarios maintaining backward compatibility

Good separation of concerns with no DB or storage dependencies.


183-263: Good database-level test coverage.

Tests properly validate:

  • Default storage location behavior
  • Content retrieval from PostgreSQL
  • Error handling when content is missing
  • SHA256-based storage key derivation
  • Deduplication detection via check_already_uploaded

389-420: Critical regression test for double-encryption bug.

This test (test_offload_with_encryption_roundtrip) is valuable as it explicitly guards against the double-encryption bug mentioned in the docstring. The test verifies that encrypted blobs can be offloaded and read back correctly, which is a common failure point.


494-562: Important deduplication behavior documented in test.

The test correctly validates that when two blobs with identical content are encrypted with different keys, deduplication uses the first blob's encryption key_id. The inline comment at line 554 clarifies this is expected behavior until key rotation is complete.


565-667: Key rotation tests cover both storage locations.

The tests properly validate the re-encryption workflow for:

  • PostgreSQL-stored blobs (decrypt with old key, encrypt with new key, update in place)
  • Object storage blobs (download, decrypt, re-encrypt, upload, update metadata)

Both tests verify content integrity after rotation, which is critical.

src/backend/messages/settings.py (2)

234-259: New storage configuration follows existing patterns.

The message-blobs storage configuration mirrors the existing message-imports pattern. The default bucket name msg-blobs is provided.

One difference: endpoint_url has no default value, whereas some configurations might expect a default for local development. Verify this is intentional and that the development environment properly sets STORAGE_MESSAGE_BLOBS_ENDPOINT_URL.


376-398: Well-documented encryption and offload configuration.

The configuration properly:

  • Documents the key format and key_id=0 convention
  • Uses appropriate types (JSONValue for dict, PositiveIntegerValue for IDs)
  • Defaults to encryption disabled (key_id=0), requiring explicit opt-in
  • Allows fine-grained control over offload timing and size thresholds
src/backend/core/tests/tasks/test_tiered_storage_tasks.py (5)

32-48: Disabled state test properly mocks settings.

The test correctly mocks the storage configuration to verify the task gracefully handles the disabled state.


55-120: Good coverage of blob eligibility criteria.

The tests properly validate:

  • Age-based filtering using TIERED_STORAGE_OFFLOAD_AFTER_DAYS
  • Size-based filtering using TIERED_STORAGE_OFFLOAD_MIN_SIZE
  • The use of Blob.objects.filter().update() correctly bypasses any auto-update timestamps

The conditional assertion at line 119 (if settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0) adapts to the test environment settings, which is acceptable.


156-171: Consistent disabled state handling test.

Follows the same mocking pattern as the batch task test.


242-264: Good error handling test with rollback verification.

The test test_handles_upload_error properly verifies that:

  1. Upload errors are caught and reported
  2. The blob state is preserved (transaction rolled back)
  3. Storage location remains POSTGRES and raw_content is intact

This ensures data integrity during upload failures.


335-352: Idempotency test validates concurrent safety.

The test_concurrent_offload_idempotent test verifies that repeated offload attempts for the same blob are handled gracefully, returning already_offloaded on subsequent calls. This is important for Celery task retry scenarios.

src/backend/core/management/commands/verify_tiered_storage.py (6)

24-88: Well-designed CLI interface with clear modes.

The command provides:

  • Multiple verification modes (db-to-storage, storage-to-db, full)
  • Safety features (--dry-run, --limit)
  • Key rotation capability (--re-encrypt)
  • Proper early exit when storage is not configured

89-134: DB-to-storage verification handles scale well.

The method:

  • Uses iterator(chunk_size=1000) to avoid memory issues with large datasets
  • Reports missing blobs to stderr (appropriate severity for potential data loss)
  • Respects the --limit option for sampling

136-218: Storage-to-DB verification with orphan cleanup.

The method properly:

  • Validates storage path format before processing
  • Detects orphans (objects without DB references)
  • Optionally deletes orphans with --fix
  • Optionally verifies hashes (wisely marked as "slow")

Progress is reported every 100 objects (line 201), which provides good feedback during long operations.


220-254: Storage listing handles multiple backends.

The method:

  • Prefers direct boto3 access with pagination for efficiency
  • Falls back to Django's listdir for compatibility
  • Raises clear error when listing is not supported

256-305: Hash verification covers the complete pipeline.

The method correctly:

  • Downloads encrypted content from storage
  • Decrypts using the blob's encryption_key_id
  • Decompresses based on compression type
  • Computes and compares SHA256 hash

The broad exception handling is acceptable here since the goal is to report issues, not crash.


307-402: Re-encryption workflow with proper validation.

The method:

  • Validates encryption configuration before starting
  • Uses smaller chunk size (100) appropriate for the heavier re-encryption operation
  • Supports --dry-run for safe preview
  • Provides clear progress reporting and final summary
src/backend/core/models.py (6)

31-33: No review needed for this import change.


1472-1487: Encryption is integrated cleanly into blob creation.
Storing encrypted bytes and the key id alongside compression keeps retrieval consistent.


1595-1599: Conditional size_compressed update makes sense.
This avoids clobbering the stored size after offload clears raw_content.


1601-1609: No issues spotted.


1624-1643: Content retrieval flow reads cleanly.
The PostgreSQL vs object-storage branches and decryption/decompression steps are clear.


1645-1660: Deletion flow aligns with object-storage cleanup.

src/backend/core/services/tiered_storage.py (8)

1-24: No review needed for the module header/imports.


44-49: Lazy storage initialization looks good.


51-66: Storage key sharding is straightforward.


68-117: Encryption/decryption flow is consistent with key_id semantics.
Please double-check that configured key formats align with Fernet requirements.


119-155: DB-backed dedup lookups are clear.


208-243: Download path is clear and well-scoped.


282-296: Existence check is straightforward.


157-207: Return value is correctly handled in production code. The call site in src/backend/core/services/tiered_storage_tasks.py (line 113) properly captures the returned key_id and updates blob.encryption_key_id accordingly. Dedup-specific tests also confirm this pattern works correctly when the same content is uploaded with different encryption keys.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment thread src/backend/core/management/commands/verify_tiered_storage.py Outdated
Comment thread src/backend/core/services/tiered_storage.py
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 449-455: When re-encrypting Postgres blobs in
verify_tiered_storage.py (using self.service.decrypt and self.service.encrypt)
ensure you persist the recomputed size_compressed: after setting
blob.raw_content = encrypted and blob.encryption_key_id = new_key_id, call
blob.save(update_fields=["raw_content", "encryption_key_id", "size_compressed"])
so the recalculated size_compressed is written to the DB (match the approach
used in 0007_blob_size_compressed.py).
- Around line 280-282: Move the local imports of pyzstd and
CompressionTypeChoices out of the function and place them at the top of the
module alongside the existing BlobStorageLocationChoices import so they are
module-level imports; update the import section to include "import pyzstd" and
"from core.enums import CompressionTypeChoices" and remove the in-function
imports where pyzstd and CompressionTypeChoices are currently referenced in
verify_tiered_storage logic.

In `@src/backend/core/signals.py`:
- Around line 109-119: The post_delete signal handler cleanup_blob_storage
currently calls TieredStorageService().delete_if_orphaned() immediately, which
can run inside a transaction that may roll back; wrap the deletion call in
transaction.on_commit to defer it until after successful commit: import
django.db.transaction if needed, capture the instance.sha256 (or
bytes(instance.sha256)) and any required storage_location check inside
cleanup_blob_storage, create a small closure or lambda that calls
TieredStorageService().delete_if_orphaned(...) and register it with
transaction.on_commit, and ensure the existing guard checks
(BlobStorageLocationChoices.OBJECT_STORAGE and service.enabled) are preserved
before scheduling the on_commit callback.

In `@src/backend/core/tests/services/test_tiered_storage.py`:
- Line 114: Several intentional local test imports (e.g. "from
cryptography.fernet import InvalidToken" and the other test-only imports flagged
at the comment's listed locations) are meant to remain inside test functions;
add a trailing "# noqa: PLC0415" to each of those local import lines instead of
moving them to module level so Ruff/Pylint stops reporting
import-outside-toplevel while keeping the imports local to their tests. Ensure
you update each flagged import line (the ones referenced in the review) by
appending the exact comment "# noqa: PLC0415".
🧹 Nitpick comments (2)
src/backend/core/tests/services/test_tiered_storage.py (1)

265-563: Consider skipping E2E tests when object storage isn’t configured.

The docstring says “when available,” but these tests hard‑fail if storage isn’t configured. A skipif makes local/dev runs more resilient.

🔧 Suggested refactor
+# At module level
+_STORAGE_ENABLED = TieredStorageService().enabled
@@
-@pytest.mark.django_db
-class TestTieredStorageE2E:
+@pytest.mark.django_db
+@pytest.mark.skipif(
+    not _STORAGE_ENABLED, reason="Object storage not configured"
+)
+class TestTieredStorageE2E:
src/backend/core/tests/tasks/test_tiered_storage_tasks.py (1)

60-194: Consider skipping E2E task tests when object storage isn’t configured.

These E2E tests will fail in environments without MinIO. A skipif keeps dev/test runs resilient while still exercising coverage in CI.

🔧 Suggested refactor
+# At module level
+_STORAGE_ENABLED = TieredStorageService().enabled
@@
-@pytest.mark.django_db
-class TestOffloadBlobsTaskE2E:
+@pytest.mark.django_db
+@pytest.mark.skipif(
+    not _STORAGE_ENABLED, reason="Object storage not configured"
+)
+class TestOffloadBlobsTaskE2E:
@@
-@pytest.mark.django_db
-class TestOffloadSingleBlobTaskE2E:
+@pytest.mark.django_db
+@pytest.mark.skipif(
+    not _STORAGE_ENABLED, reason="Object storage not configured"
+)
+class TestOffloadSingleBlobTaskE2E:

Also applies to: 221-399

Comment thread src/backend/core/management/commands/verify_tiered_storage.py Outdated
Comment thread src/backend/core/management/commands/verify_tiered_storage.py Outdated
Comment thread src/backend/core/signals.py Outdated
Comment thread src/backend/core/tests/services/test_tiered_storage.py Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 475-535: The code currently locks and updates only the single Blob
row but the storage object is shared by all Blobs with the same sha256; fix by
selecting and locking the entire cohort (all Blob rows that share the sha256 and
OBJECT_STORAGE) inside the transaction, verify none already have
encryption_key_id == target_key_id (or treat as already-promoted if they do),
then update every locked row's encryption_key_id to new_key_id before calling
transaction.on_commit(_promote_temp). Use a select_for_update() queryset
filtered by blob.sha256 (and storage type/field used for OBJECT_STORAGE) instead
of Blob.objects.select_for_update().get(...), and perform a bulk update or
iterate the locked rows to set and save encryption_key_id so all siblings remain
readable after promotion.

In `@src/backend/core/services/tiered_storage.py`:
- Around line 179-190: The dedup fast-path returns the existing
encryption_key_id without verifying the underlying object is present; change the
logic in the branch that uses check_already_uploaded(sha256_bytes) so it
computes the canonical storage key via compute_storage_key(sha256_bytes) and
verifies the object exists in the storage backend (e.g., storage.exists(key) or
trying to open/read the object) before calling
get_existing_key_id(sha256_bytes); if the object is missing or unreadable, fall
through to the upload path that uses blob.raw_content and storage.save(...) so
we don't convert the row to OBJECT_STORAGE pointing at nothing. Ensure you
reference check_already_uploaded, compute_storage_key, get_existing_key_id,
storage.exists/open, and storage.save in the change.
- Around line 47-61: compute_storage_key currently only uses the SHA of the
uncompressed payload, so blobs with identical source bytes but different
compression metadata collide; change compute_storage_key to include the storage
representation (e.g., compression) in the key or in the input used to derive the
key (either by accepting a compression/representation parameter and returning
something like "blobs/{compression}/{sha[:3]}/{sha}" or by computing the SHA
over the final compressed bytes), and update every call site (notably
upload_blob and the call sites around lines 187-190) to pass the representation
so deduplication is based on both content and representation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 966620c0-2a4d-4de7-be72-51978a4576ac

📥 Commits

Reviewing files that changed from the base of the PR and between 5d0f39a and 0706ad8.

📒 Files selected for processing (6)
  • src/backend/core/management/commands/verify_tiered_storage.py
  • src/backend/core/services/tiered_storage.py
  • src/backend/core/signals.py
  • src/backend/core/tests/commands/test_verify_tiered_storage.py
  • src/backend/core/tests/services/test_tiered_storage.py
  • src/backend/core/tests/tasks/test_tiered_storage_tasks.py

Comment thread src/backend/core/management/commands/verify_tiered_storage.py Outdated
Comment thread src/backend/core/services/tiered_storage.py Outdated
Comment thread src/backend/core/services/tiered_storage.py Outdated
Comment thread src/backend/core/services/tiered_storage.py Fixed
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

♻️ Duplicate comments (1)
src/backend/core/services/tiered_storage.py (1)

74-88: ⚠️ Potential issue | 🔴 Critical

Include compression in the storage identity.

sha256 is computed from the uncompressed payload, but the stored object is the compressed/encrypted representation. Two blobs with identical source bytes and different compression values will collide on the same key, dedup together in upload_blob(), and later one of them will read back bytes that do not match its own metadata. Key derivation/dedup needs to include the storage representation, or uploads need to canonicalize representation before writing.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/tiered_storage.py` around lines 74 - 88,
compute_storage_key currently derives the object key only from the source SHA256
(sha256_bytes) which ignores the chosen compression/encryption, causing
different storage representations to collide; update key derivation to include
the storage representation (e.g., incorporate the compression string/enum and
any representation-specific metadata into the input used to derive the key) or
alternatively ensure upload_blob canonicalizes the bytes (compress/encrypt)
before computing the SHA used by compute_storage_key so keys reflect the actual
stored bytes; refer to compute_storage_key and upload_blob when making the
change so the key always encodes the compression parameter (or is computed from
post-compression bytes).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/backend/core/management/commands/verify_tiered_storage.py`:
- Around line 332-356: The current loop limits raw queryset rows, but
OBJECT_STORAGE re-encrypts entire sha256 cohorts in
_re_encrypt_object_storage_blob, so change the logic in the command to build a
worklist of unique re-encryption work-units first (each work-unit represents
either a single Postgres row or a unique object-storage cohort identified by
sha256/object key), deduplicate those units, then apply the --limit to that
worklist before processing; update the code paths around Blob, queryset,
_re_encrypt_single_blob and _re_encrypt_object_storage_blob to consult the
prebuilt worklist (not the raw queryset) when counting, printing "Blobs to
re-encrypt", and iterating, and ensure current_key_id and OBJECT_STORAGE checks
are used to determine whether a row maps to a cohort work-unit.

In
`@src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py`:
- Around line 13-17: The new migrations.AddField for model_name='blob' adding
name='encryption_key_id' should create an indexed column and also add a
composite index with storage_location to support key-rotation queries; update
the migration to set db_index=True on the SmallIntegerField for
encryption_key_id (the migrations.AddField entry) and add a migrations.AddIndex
entry that creates an index on ('encryption_key_id','storage_location') (or
equivalent Index/fields tuple) so queries filtering by encryption_key_id and
storage_location use an index.

In `@src/backend/core/models.py`:
- Around line 2189-2201: The new tiered-storage fields (storage_location,
created_at, size, sha256) need composite DB indexes to avoid broad scans; update
the Blob model by adding appropriate Meta.indexes entries (e.g., an index on
("storage_location","created_at","size") for offload scans and an index on
("sha256","storage_location") for dedup/orphan checks) and create a Django
migration that adds these same composite indexes to the database so the ORM and
DB stay in sync; reference the Blob class/Blob.Meta and the existing migration
pattern for adding indexes when implementing the change.

In `@src/backend/core/services/tiered_storage_tasks.py`:
- Around line 78-91: The second Blob lookup inside the transaction/lock (the
select_for_update().get(id=blob_id) call in tiered_storage_tasks.py) can raise
Blob.DoesNotExist if the row was deleted after the initial sha256 lookup; wrap
that select_for_update().get(...) in a try/except catching Blob.DoesNotExist and
return {"status": "not_found", "blob_id": blob_id} instead of letting it bubble
to the broad exception handler; apply the same fix to the other similar block
later in this file (the other select_for_update().get usage around the offload
completion code) so both places avoid logger.exception noise and return
not_found on concurrent delete.

In `@src/backend/core/services/tiered_storage.py`:
- Around line 303-310: The current delete_if_orphaned implementation swallows
all exceptions from storage.delete and returns False, preventing
cleanup_orphaned_blob_task from retrying transient errors; change
delete_if_orphaned so that storage.delete failures are not converted into a
silent False: either re-raise the caught exception (allowing
cleanup_orphaned_blob_task to catch and retry) or return a distinct error result
that cleanup_orphaned_blob_task understands as retryable. Locate
compute_storage_key, delete_if_orphaned and the call to self.storage.delete in
tiered_storage.py and update the exception handling to
propagate/storage-delete-error semantics instead of treating every exception as
"still_referenced".

In `@src/backend/core/tests/conftest.py`:
- Around line 39-47: The current bucket bootstrap incorrectly catches
client.exceptions.NoSuchBucket (which doesn’t exist) and falls back to a broad
except that masks real failures; update the try/except around
client.head_bucket() to catch botocore.exceptions.ClientError (import
botocore.exceptions as needed), inspect the error (e.response['Error']['Code']
or HTTPStatus in e.response['ResponseMetadata']) and only call
client.create_bucket(Bucket=bucket_name) when the error indicates the bucket is
missing (404 / NoSuchBucket), otherwise re-raise or log the unexpected exception
so real setup failures aren’t swallowed; replace references to
client.exceptions.NoSuchBucket with this ClientError check and ensure
client.create_bucket remains the recovery path.

In `@src/backend/core/tests/tasks/test_task_send_message.py`:
- Line 3: Remove the module-level "# pylint:
disable=no-value-for-parameter,unused-argument" and instead apply scoped
disables at the exact offending sites: add "# pylint:
disable=no-value-for-parameter" inline on the direct task invocation lines (the
direct task call sites in this test file) and add "# pylint:
disable=unused-argument" either inline on the specific test function signatures
or on the unused fixture parameter names where they are declared; ensure only
those lines reference the disables so future
unused-argument/no-value-for-parameter issues elsewhere in the file are not
globally suppressed.

In `@src/backend/messages/settings.py`:
- Around line 295-320: The "message-blobs" storage entry is always present in
settings.STORAGES which makes TieredStorageService.enabled truthy even when no
object-store is configured; change settings.py so the "message-blobs" key is
only added when real object-storage config is provided (e.g. check relevant env
vars such as STORAGE_MESSAGE_BLOBS_BUCKET_NAME or
STORAGE_MESSAGE_BLOBS_ENDPOINT_URL/ACCESS_KEY/SECRET in os.environ or via
values.Value().environ_name) instead of unconditionally defining the dict;
locate the "message-blobs" dict in src/backend/messages/settings.py and wrap its
creation/assignment in that conditional so
settings.STORAGES.get("message-blobs") is falsy when no blob config is supplied,
preserving the documented "empty = disabled" behavior used by
TieredStorageService.enabled.

---

Duplicate comments:
In `@src/backend/core/services/tiered_storage.py`:
- Around line 74-88: compute_storage_key currently derives the object key only
from the source SHA256 (sha256_bytes) which ignores the chosen
compression/encryption, causing different storage representations to collide;
update key derivation to include the storage representation (e.g., incorporate
the compression string/enum and any representation-specific metadata into the
input used to derive the key) or alternatively ensure upload_blob canonicalizes
the bytes (compress/encrypt) before computing the SHA used by
compute_storage_key so keys reflect the actual stored bytes; refer to
compute_storage_key and upload_blob when making the change so the key always
encodes the compression parameter (or is computed from post-compression bytes).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 59f4d311-6f3d-406c-bdf2-bf1f1057f13c

📥 Commits

Reviewing files that changed from the base of the PR and between 0706ad8 and 6c43496.

📒 Files selected for processing (23)
  • .github/workflows/messages.yml
  • Makefile
  • env.d/development/backend.defaults
  • src/backend/core/admin.py
  • src/backend/core/enums.py
  • src/backend/core/management/commands/verify_tiered_storage.py
  • src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py
  • src/backend/core/models.py
  • src/backend/core/services/search/search.py
  • src/backend/core/services/tiered_storage.py
  • src/backend/core/services/tiered_storage_tasks.py
  • src/backend/core/signals.py
  • src/backend/core/tests/commands/__init__.py
  • src/backend/core/tests/commands/test_verify_tiered_storage.py
  • src/backend/core/tests/conftest.py
  • src/backend/core/tests/services/__init__.py
  • src/backend/core/tests/services/test_tiered_storage.py
  • src/backend/core/tests/tasks/__init__.py
  • src/backend/core/tests/tasks/test_task_send_message.py
  • src/backend/core/tests/tasks/test_tiered_storage_tasks.py
  • src/backend/core/utils.py
  • src/backend/messages/celery_app.py
  • src/backend/messages/settings.py

Comment thread src/backend/core/management/commands/verify_tiered_storage.py Outdated
Comment on lines +13 to +17
migrations.AddField(
model_name='blob',
name='encryption_key_id',
field=models.SmallIntegerField(default=0, help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])', verbose_name='encryption key ID'),
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing index on encryption_key_id will hurt large-scale key-rotation queries.

At the PR’s target scale, filtering blobs by encryption key can become a full-table scan. Please index this field (and ideally add a composite index with storage_location if queries combine both).

🔧 Suggested migration adjustment
         migrations.AddField(
             model_name='blob',
             name='encryption_key_id',
-            field=models.SmallIntegerField(default=0, help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])', verbose_name='encryption key ID'),
+            field=models.SmallIntegerField(
+                default=0,
+                db_index=True,
+                help_text='Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])',
+                verbose_name='encryption key ID',
+            ),
         ),
+        migrations.AddIndex(
+            model_name='blob',
+            index=models.Index(
+                fields=['storage_location', 'encryption_key_id'],
+                name='core_blob_storage_enc_idx',
+            ),
+        ),

As per coding guidelines, src/backend/**/{models.py,migrations/**/*.py}: "Implement database indexing and query optimization (Model Meta indexes, constraints)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py`
around lines 13 - 17, The new migrations.AddField for model_name='blob' adding
name='encryption_key_id' should create an indexed column and also add a
composite index with storage_location to support key-rotation queries; update
the migration to set db_index=True on the SmallIntegerField for
encryption_key_id (the migrations.AddField entry) and add a migrations.AddIndex
entry that creates an index on ('encryption_key_id','storage_location') (or
equivalent Index/fields tuple) so queries filtering by encryption_key_id and
storage_location use an index.

Comment thread src/backend/core/models.py Outdated
Comment on lines 2189 to 2201
# Tiered storage fields
storage_location = models.SmallIntegerField(
"storage location",
choices=BlobStorageLocationChoices.choices,
default=BlobStorageLocationChoices.POSTGRES,
help_text="Where the blob content is stored",
db_index=True,
)
encryption_key_id = models.SmallIntegerField(
"encryption key ID",
default=0,
help_text="Encryption key ID (0=none, >=1=encrypted with keys[key_id-1])",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add composite indexes for the new tiered-storage lookups.

These new columns become the hot filter keys for the periodic offload scan (storage_location, created_at, size) and for dedup/orphan checks (sha256, storage_location). At the scale this PR targets, single-column indexes here will still drive very broad scans on messages_blob. Please add matching composite indexes in Blob.Meta and the migration.

Suggested index shape
 class Meta:
     db_table = "messages_blob"
     verbose_name = "blob"
     verbose_name_plural = "blobs"
     ordering = ["-created_at"]
+    indexes = [
+        models.Index(
+            fields=["storage_location", "created_at", "size"],
+            name="msg_blob_offload_scan",
+        ),
+        models.Index(
+            fields=["sha256", "storage_location"],
+            name="msg_blob_sha_loc",
+        ),
+    ]
     constraints = [

As per coding guidelines, src/backend/**/{models.py,migrations/**/*.py}: Implement database indexing and query optimization (Model Meta indexes, constraints).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/models.py` around lines 2189 - 2201, The new tiered-storage
fields (storage_location, created_at, size, sha256) need composite DB indexes to
avoid broad scans; update the Blob model by adding appropriate Meta.indexes
entries (e.g., an index on ("storage_location","created_at","size") for offload
scans and an index on ("sha256","storage_location") for dedup/orphan checks) and
create a Django migration that adds these same composite indexes to the database
so the ORM and DB stay in sync; reference the Blob class/Blob.Meta and the
existing migration pattern for adding indexes when implementing the change.

Comment thread src/backend/core/services/tiered_storage_tasks.py
Comment thread src/backend/core/services/tiered_storage.py Outdated
Comment thread src/backend/core/tests/conftest.py Outdated
Comment thread src/backend/core/tests/tasks/test_task_send_message.py Outdated
Comment on lines +295 to +320
"message-blobs": {
"BACKEND": "storages.backends.s3.S3Storage",
"OPTIONS": {
"endpoint_url": values.Value(
environ_name="STORAGE_MESSAGE_BLOBS_ENDPOINT_URL",
environ_prefix=None,
),
"bucket_name": values.Value(
"msg-blobs",
environ_name="STORAGE_MESSAGE_BLOBS_BUCKET_NAME",
environ_prefix=None,
),
"access_key": values.Value(
environ_name="STORAGE_MESSAGE_BLOBS_ACCESS_KEY",
environ_prefix=None,
),
"secret_key": values.Value(
environ_name="STORAGE_MESSAGE_BLOBS_SECRET_KEY",
environ_prefix=None,
),
"region_name": values.Value(
environ_name="STORAGE_MESSAGE_BLOBS_REGION_NAME",
environ_prefix=None,
),
},
},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Gate message-blobs registration on real object-storage config.

TieredStorageService.enabled is just bool(settings.STORAGES.get("message-blobs")), so defining this alias unconditionally here makes blob offloading look enabled even when STORAGE_MESSAGE_BLOBS_* is intentionally left empty. That breaks the documented “empty = disabled” contract and can send the offload path into a partially configured S3 backend instead of keeping blobs in Postgres.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/messages/settings.py` around lines 295 - 320, The "message-blobs"
storage entry is always present in settings.STORAGES which makes
TieredStorageService.enabled truthy even when no object-store is configured;
change settings.py so the "message-blobs" key is only added when real
object-storage config is provided (e.g. check relevant env vars such as
STORAGE_MESSAGE_BLOBS_BUCKET_NAME or
STORAGE_MESSAGE_BLOBS_ENDPOINT_URL/ACCESS_KEY/SECRET in os.environ or via
values.Value().environ_name) instead of unconditionally defining the dict;
locate the "message-blobs" dict in src/backend/messages/settings.py and wrap its
creation/assignment in that conditional so
settings.STORAGES.get("message-blobs") is falsy when no blob config is supplied,
preserving the documented "empty = disabled" behavior used by
TieredStorageService.enabled.

Comment thread src/backend/core/services/tiered_storage.py Fixed
Comment thread src/backend/core/services/tiered_storage.py Fixed
@sylvinus sylvinus requested a review from jbpenrath April 29, 2026 00:57
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/backend/core/models.py (1)

2077-2102: ⚠️ Potential issue | 🟠 Major

This configurable default is still bypassed by Mailbox.create_blob().

BlobManager.create_blob() now uses compression=None to mean “read MESSAGES_BLOB_COMPRESS”, but src/backend/core/models.py:752-778 still defaults Mailbox.create_blob(..., compression=CompressionTypeChoices.ZSTD) and forwards that value unconditionally. Callers using the mailbox helper will never reach this new fallback.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/models.py` around lines 2077 - 2102, Mailbox.create_blob
currently supplies CompressionTypeChoices.ZSTD (hard default) and forwards it to
BlobManager.create_blob which treats compression=None as "use
settings.MESSAGES_BLOB_COMPRESS", so callers of Mailbox.create_blob never hit
the new configurable default; change Mailbox.create_blob to accept compression:
Optional[CompressionTypeChoices] = None (or if the signature must remain, ensure
it passes None when the caller didn't explicitly request a compression) and
forward that None to BlobManager.create_blob so
BlobManager.parse_compression_spec can apply settings.MESSAGES_BLOB_COMPRESS;
update references to the Mailbox.create_blob parameter handling to distinguish
"unspecified" vs explicit CompressionTypeChoices.ZSTD and only pass ZSTD when
explicitly requested.
♻️ Duplicate comments (6)
src/backend/core/services/tiered_storage.py (3)

291-298: ⚠️ Potential issue | 🟠 Major

Propagate delete failures so cleanup can retry them.

cleanup_orphaned_blob_task() only retries when delete_if_orphaned() raises. Converting storage delete failures into False makes the task report "still_referenced" on transient backend errors and leaks the orphaned object until manual repair.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/tiered_storage.py` around lines 291 - 298, The
current delete_if_orphaned implementation swallows storage.delete failures by
logging and returning False, preventing cleanup_orphaned_blob_task from
retrying; change the code so storage.delete failures propagate: in
delete_if_orphaned (the block that calls self.compute_storage_key and
self.storage.delete) remove or narrow the broad except that returns False and
instead log the error and re-raise the exception (or allow it to bubble up) so
cleanup_orphaned_blob_task sees the exception and will retry the orphaned
object; reference functions: delete_if_orphaned, compute_storage_key,
self.storage.delete, and cleanup_orphaned_blob_task.

185-192: ⚠️ Potential issue | 🟠 Major

Don't log full storage keys.

These log lines expose the object path, which embeds the blob SHA and key cohort. That is sensitive storage metadata and should not be written to normal application logs; log blob.id and the outcome instead.

Based on learnings, src/backend/**/*.py: Do not log sensitive information (tokens, passwords, financial/health data, PII).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/tiered_storage.py` around lines 185 - 192, The
debug/info logs currently emit sensitive storage keys/paths (see logger.debug
and logger.info around the dedupe/save flow); change these to avoid logging the
full storage key or existing_path returned by
compute_storage_key_for_blob/storage.save and instead log only non-sensitive
identifiers such as blob.id and the outcome (e.g., "deduped" or "uploaded") and,
if needed, existing_key_id (only if it is non-sensitive); update the
logger.debug that mentions existing_path and the logger.info after storage.save
to remove the path variable and replace with a brief outcome message referencing
blob.id and the operation result.

100-115: ⚠️ Potential issue | 🔴 Critical

Dedup still ignores the stored representation.

The storage key and sibling lookup are keyed by sha256/key_id, but the uploaded bytes are blob.raw_content after compression. Two blobs with identical source bytes and different compression values will reuse the same object and one of them will later read back bytes that do not match its own compression metadata.

Also applies to: 154-193

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/tiered_storage.py` around lines 100 - 115,
compute_storage_key and the sibling lookup currently key on sha256 of the raw
bytes and key_id only, but the uploaded object is the compressed/stored
representation (blob.raw_content), so different compression values can collide;
change the logic to compute the SHA over the actual bytes written (i.e., the
compressed/stored bytes) and include the representation/compression identifier
in the storage key (and sibling lookup) so keys are unique per stored format;
update compute_storage_key to accept the stored-bytes or a representation tag
(e.g., compression) along with key_id, and make the sibling lookup code use that
same computed key/sha-of-stored-bytes so read/write use the identical namespace.
src/backend/core/services/tiered_storage_tasks.py (1)

91-91: ⚠️ Potential issue | 🟠 Major

Treat the locked re-fetch delete race as not_found.

The row can still disappear between the sha256 lookup and the select_for_update().get(...) at Line 91. Right now that benign race falls into the catch-all and returns "error". Catch Blob.DoesNotExist around the locked lookup and return {"status": "not_found", "blob_id": blob_id} instead.

Based on learnings, logger.exception(...) in src/backend/**/*.py is automatically reported to Sentry as an event.

Also applies to: 121-123

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/tiered_storage_tasks.py` at line 91, The selective
row-lock fetch using Blob.objects.select_for_update().get(id=blob_id) can raise
Blob.DoesNotExist if the row was deleted after the sha256 lookup; wrap that
specific call in a try/except catching Blob.DoesNotExist and return {"status":
"not_found", "blob_id": blob_id} instead of falling into the generic error path
(do the same for the similar select_for_update().get(...) at the block around
lines 121-123); ensure you only catch Blob.DoesNotExist (not broad Exception)
and avoid using logger.exception for this benign race.
src/backend/messages/settings.py (1)

295-320: ⚠️ Potential issue | 🟠 Major

Only register message-blobs when blob storage is really configured.

This alias is always present, so TieredStorageService.enabled becomes truthy even when blob-storage env vars are intentionally unset. That pushes offload and verification down a half-configured backend instead of keeping tiered storage disabled.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/messages/settings.py` around lines 295 - 320, The "message-blobs"
storage alias is always registered which makes TieredStorageService.enabled
truthy even when blob env vars are unset; change the settings population so the
"message-blobs" entry is only added when required blob configuration exists
(e.g., STORAGE_MESSAGE_BLOBS_BUCKET_NAME or STORAGE_MESSAGE_BLOBS_ENDPOINT_URL
is present). In practice, update the code that builds the storage aliases (the
dict containing "message-blobs") to conditionally insert that key only if the
relevant values.Value keys (access_key/secret_key/bucket_name or
region/endpoint) are set/Truthy, ensuring TieredStorageService.enabled
accurately reflects a fully configured blob backend.
src/backend/core/models.py (1)

2198-2243: ⚠️ Potential issue | 🟠 Major

Add composite indexes for the new tiered-storage queries.

These fields are now on the hot paths for offload scans and dedup/orphan checks, but Blob.Meta still has no matching composite indexes. At the scale described in this PR, single-column indexes will still leave very broad scans on messages_blob.

As per coding guidelines, src/backend/**/{models.py,migrations/**/*.py}: Implement database indexing and query optimization (Model Meta indexes, constraints).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/models.py` around lines 2198 - 2243, Add composite DB
indexes to the Blob model Meta to support tiered-storage queries: update
Blob.Meta to include a list of models.Index entries for the hot-path
combinations referencing the actual field names—at minimum add indexes on
("storage_location", "mailbox"), ("storage_location", "maildomain") and
("storage_location", "encryption_key_id") (give each index a descriptive name);
this ensures queries in BlobManager and offload/dedup/orphan scans that filter
by storage_location plus owner or key use the composite indexes instead of wide
single-column scans.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/backend/core/services/tiered_storage_tasks.py`:
- Around line 87-89: The broad except in the task is catching
celery.exceptions.Retry raised by self.retry() (used after sha256_advisory_lock
contention and on transient errors), preventing Celery from requeueing; add an
explicit except celery.exceptions.Retry: raise (or re-raise) immediately before
the existing generic except Exception so Retry can propagate, leaving the rest
of the error handling unchanged — target the block that uses
sha256_advisory_lock and self.retry and the subsequent broad except to insert
this handler.

In `@src/backend/core/services/tiered_storage.py`:
- Around line 230-257: rotate_blob() currently calls self.encrypt(...) which
always uses self.active_key_id, so when a caller passes a different
target_key_id the data is encrypted with the wrong key while paths/DB are
updated for the target; fix by either passing target_key_id into the encryption
routine (add an optional parameter to self.encrypt and call
self.encrypt(decrypted, key_id=target_key_id) in both the DB-blob and
OBJECT_STORAGE branches so encryption, compute_storage_key(sha256,
target_key_id) and DB updates use the same key) or, if you intend only to
support the active key, immediately assert target_key_id == self.active_key_id
at the start of rotate_blob() and remove the misleading parameter; update calls
around encrypt, compute_storage_key, and Blob.objects.filter(...).update(...)
accordingly to keep key IDs consistent.

In `@src/backend/core/tests/commands/test_verify_tiered_storage.py`:
- Around line 322-619: The tests that call the management command with
re_encrypt=True mutate the mocked TieredStorageService but fail to patch the
settings that re_encrypt_blobs() reads (MESSAGES_BLOB_ENCRYPTION_KEYS /
MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID), so the command uses ambient Django
settings instead of the test scenario; fix each re_encrypt=True test by patching
the same settings module that re_encrypt_blobs() uses (e.g., patch
"core.services.tiered_storage.settings" or django.conf.settings) and set
MESSAGES_BLOB_ENCRYPTION_KEYS and MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID to
match the service.encryption_keys and service.active_key_id before you patch
"core.management.commands.verify_tiered_storage.TieredStorageService", ensuring
the command and the mocked TieredStorageService see the same encryption config.

In `@src/backend/core/tests/tasks/test_tiered_storage_tasks.py`:
- Around line 61-159: The age-based offload tests fail when
TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0 because offload_blobs_task filters by
size__gte; update the tests (test_queues_eligible_blobs_by_age and
test_immediate_offload_with_zero_days) to ensure created blobs meet the size
threshold (e.g., make content length >=
settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE) or apply
override_settings(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0) for those tests so the
age/immediacy logic is exercised; reference the tests by name and the
offload_blobs_task and TieredStorageService.compute_storage_key_for_blob symbols
when making the change.

---

Outside diff comments:
In `@src/backend/core/models.py`:
- Around line 2077-2102: Mailbox.create_blob currently supplies
CompressionTypeChoices.ZSTD (hard default) and forwards it to
BlobManager.create_blob which treats compression=None as "use
settings.MESSAGES_BLOB_COMPRESS", so callers of Mailbox.create_blob never hit
the new configurable default; change Mailbox.create_blob to accept compression:
Optional[CompressionTypeChoices] = None (or if the signature must remain, ensure
it passes None when the caller didn't explicitly request a compression) and
forward that None to BlobManager.create_blob so
BlobManager.parse_compression_spec can apply settings.MESSAGES_BLOB_COMPRESS;
update references to the Mailbox.create_blob parameter handling to distinguish
"unspecified" vs explicit CompressionTypeChoices.ZSTD and only pass ZSTD when
explicitly requested.

---

Duplicate comments:
In `@src/backend/core/models.py`:
- Around line 2198-2243: Add composite DB indexes to the Blob model Meta to
support tiered-storage queries: update Blob.Meta to include a list of
models.Index entries for the hot-path combinations referencing the actual field
names—at minimum add indexes on ("storage_location", "mailbox"),
("storage_location", "maildomain") and ("storage_location", "encryption_key_id")
(give each index a descriptive name); this ensures queries in BlobManager and
offload/dedup/orphan scans that filter by storage_location plus owner or key use
the composite indexes instead of wide single-column scans.

In `@src/backend/core/services/tiered_storage_tasks.py`:
- Line 91: The selective row-lock fetch using
Blob.objects.select_for_update().get(id=blob_id) can raise Blob.DoesNotExist if
the row was deleted after the sha256 lookup; wrap that specific call in a
try/except catching Blob.DoesNotExist and return {"status": "not_found",
"blob_id": blob_id} instead of falling into the generic error path (do the same
for the similar select_for_update().get(...) at the block around lines 121-123);
ensure you only catch Blob.DoesNotExist (not broad Exception) and avoid using
logger.exception for this benign race.

In `@src/backend/core/services/tiered_storage.py`:
- Around line 291-298: The current delete_if_orphaned implementation swallows
storage.delete failures by logging and returning False, preventing
cleanup_orphaned_blob_task from retrying; change the code so storage.delete
failures propagate: in delete_if_orphaned (the block that calls
self.compute_storage_key and self.storage.delete) remove or narrow the broad
except that returns False and instead log the error and re-raise the exception
(or allow it to bubble up) so cleanup_orphaned_blob_task sees the exception and
will retry the orphaned object; reference functions: delete_if_orphaned,
compute_storage_key, self.storage.delete, and cleanup_orphaned_blob_task.
- Around line 185-192: The debug/info logs currently emit sensitive storage
keys/paths (see logger.debug and logger.info around the dedupe/save flow);
change these to avoid logging the full storage key or existing_path returned by
compute_storage_key_for_blob/storage.save and instead log only non-sensitive
identifiers such as blob.id and the outcome (e.g., "deduped" or "uploaded") and,
if needed, existing_key_id (only if it is non-sensitive); update the
logger.debug that mentions existing_path and the logger.info after storage.save
to remove the path variable and replace with a brief outcome message referencing
blob.id and the operation result.
- Around line 100-115: compute_storage_key and the sibling lookup currently key
on sha256 of the raw bytes and key_id only, but the uploaded object is the
compressed/stored representation (blob.raw_content), so different compression
values can collide; change the logic to compute the SHA over the actual bytes
written (i.e., the compressed/stored bytes) and include the
representation/compression identifier in the storage key (and sibling lookup) so
keys are unique per stored format; update compute_storage_key to accept the
stored-bytes or a representation tag (e.g., compression) along with key_id, and
make the sibling lookup code use that same computed key/sha-of-stored-bytes so
read/write use the identical namespace.

In `@src/backend/messages/settings.py`:
- Around line 295-320: The "message-blobs" storage alias is always registered
which makes TieredStorageService.enabled truthy even when blob env vars are
unset; change the settings population so the "message-blobs" entry is only added
when required blob configuration exists (e.g., STORAGE_MESSAGE_BLOBS_BUCKET_NAME
or STORAGE_MESSAGE_BLOBS_ENDPOINT_URL is present). In practice, update the code
that builds the storage aliases (the dict containing "message-blobs") to
conditionally insert that key only if the relevant values.Value keys
(access_key/secret_key/bucket_name or region/endpoint) are set/Truthy, ensuring
TieredStorageService.enabled accurately reflects a fully configured blob
backend.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 9ce7b1d7-830c-4186-bee0-8fdf30b9a6db

📥 Commits

Reviewing files that changed from the base of the PR and between 6c43496 and bab23c8.

📒 Files selected for processing (13)
  • env.d/development/backend.defaults
  • src/backend/core/apps.py
  • src/backend/core/checks.py
  • src/backend/core/enums.py
  • src/backend/core/management/commands/verify_tiered_storage.py
  • src/backend/core/models.py
  • src/backend/core/services/tiered_storage.py
  • src/backend/core/services/tiered_storage_tasks.py
  • src/backend/core/signals.py
  • src/backend/core/tests/commands/test_verify_tiered_storage.py
  • src/backend/core/tests/services/test_tiered_storage.py
  • src/backend/core/tests/tasks/test_tiered_storage_tasks.py
  • src/backend/messages/settings.py

Comment thread src/backend/core/services/tiered_storage_tasks.py Outdated
Comment thread src/backend/core/services/tiered_storage.py Outdated
Comment on lines +322 to +619
def test_re_encrypt_no_keys_configured(self):
"""Test that re-encrypt fails when no keys are configured."""
from unittest.mock import patch

stdout = StringIO()
stderr = StringIO()

with patch("core.services.tiered_storage.settings") as mock_settings:
mock_settings.STORAGES = {"message-blobs": {"OPTIONS": {}}}
mock_settings.MESSAGES_BLOB_ENCRYPTION_KEYS = {}
mock_settings.MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID = 0

call_command(
"verify_tiered_storage",
re_encrypt=True,
stdout=stdout,
stderr=stderr,
)

assert "No encryption keys configured" in stderr.getvalue()

def test_all_blobs_already_current_key(self):
"""Test that re-encrypt reports success when all blobs use current key."""
key = secrets.token_hex(32)
service = TieredStorageService()
service.encryption_keys = {"1": key}
service.active_key_id = 1

mailbox = factories.MailboxFactory()
blob = mailbox.create_blob(content=b"test", content_type="text/plain")

# Manually encrypt with key 1
compressed = bytes(blob.raw_content)
encrypted, key_id = service.encrypt(compressed)
blob.raw_content = encrypted
blob.encryption_key_id = key_id
blob.save()

stdout = StringIO()
stderr = StringIO()

# Temporarily modify service in command
from unittest.mock import patch

with patch(
"core.management.commands.verify_tiered_storage.TieredStorageService"
) as mock_svc_class:
mock_svc_class.return_value = service

call_command(
"verify_tiered_storage",
re_encrypt=True,
stdout=stdout,
stderr=stderr,
)

assert "All blobs already use the current encryption key" in stdout.getvalue()

def test_re_encrypt_postgres_blob(self):
"""Test re-encrypting a PostgreSQL blob with real encryption."""
import pyzstd

service = TieredStorageService()
old_key = secrets.token_hex(32)
new_key = secrets.token_hex(32)

mailbox = factories.MailboxFactory()
original_content = b"test content for re-encryption" * 20

# Create blob and encrypt with old key (key_id=2)
blob = mailbox.create_blob(content=original_content, content_type="text/plain")
compressed = bytes(blob.raw_content)

service.encryption_keys = {"2": old_key}
service.active_key_id = 2
encrypted, key_id = service.encrypt(compressed)
blob.raw_content = encrypted
blob.encryption_key_id = key_id
blob.save()

# Now configure service for key rotation (new key is "1", old is "2")
service.encryption_keys = {"1": new_key, "2": old_key}
service.active_key_id = 1

stdout = StringIO()
stderr = StringIO()

from unittest.mock import patch

with patch(
"core.management.commands.verify_tiered_storage.TieredStorageService"
) as mock_svc_class:
mock_svc_class.return_value = service

call_command(
"verify_tiered_storage",
re_encrypt=True,
stdout=stdout,
stderr=stderr,
)

output = stdout.getvalue()
assert "Re-encrypted" in output
assert "Re-encrypted: 1" in output

# Verify blob was updated
blob.refresh_from_db()
assert blob.encryption_key_id == 1

# Verify content is still readable
decrypted = service.decrypt(bytes(blob.raw_content), blob.encryption_key_id)
assert pyzstd.decompress(decrypted) == original_content

@pytest.mark.django_db(transaction=True)
def test_re_encrypt_object_storage_blob(self):
"""Test re-encrypting an object storage blob with real encryption."""
import pyzstd

service = TieredStorageService()
old_key = secrets.token_hex(32)
new_key = secrets.token_hex(32)

mailbox = factories.MailboxFactory()
original_content = b"test content for object storage re-encryption" * 20

# Create blob and encrypt with old key (key_id=2)
blob = mailbox.create_blob(content=original_content, content_type="text/plain")
compressed = bytes(blob.raw_content)

service.encryption_keys = {"2": old_key}
service.active_key_id = 2
encrypted, key_id = service.encrypt(compressed)
blob.raw_content = encrypted
blob.encryption_key_id = key_id
blob.save()

old_path = TieredStorageService.compute_storage_key_for_blob(blob)
new_path = TieredStorageService.compute_storage_key(bytes(blob.sha256), 1)

try:
# Upload to storage
service.upload_blob(blob)
blob.storage_location = BlobStorageLocationChoices.OBJECT_STORAGE
blob.raw_content = None
blob.save()

# Configure for key rotation
service.encryption_keys = {"1": new_key, "2": old_key}
service.active_key_id = 1

stdout = StringIO()
stderr = StringIO()

from unittest.mock import patch

with patch(
"core.management.commands.verify_tiered_storage.TieredStorageService"
) as mock_svc_class:
mock_svc_class.return_value = service

call_command(
"verify_tiered_storage",
re_encrypt=True,
stdout=stdout,
stderr=stderr,
)

output = stdout.getvalue()
assert "Re-encrypted: 1" in output

# Verify blob was updated
blob.refresh_from_db()
assert blob.encryption_key_id == 1

# Verify content moved from old path to new path
assert not service.storage.exists(old_path)
assert service.storage.exists(new_path)
downloaded = service.download_blob(blob)
assert pyzstd.decompress(downloaded) == original_content
finally:
for k in (old_path, new_path):
if service.storage.exists(k):
service.storage.delete(k)

def test_dry_run(self):
"""Test that --dry-run shows what would be done without changes."""
service = TieredStorageService()
key = secrets.token_hex(32)
service.encryption_keys = {"1": key}
service.active_key_id = 1

mailbox = factories.MailboxFactory()
blob = mailbox.create_blob(content=b"test", content_type="text/plain")
# key_id=0 means unencrypted, needs re-encryption
blob.encryption_key_id = 0
blob.save()

stdout = StringIO()
stderr = StringIO()

from unittest.mock import patch

with patch(
"core.management.commands.verify_tiered_storage.TieredStorageService"
) as mock_svc_class:
mock_svc_class.return_value = service

call_command(
"verify_tiered_storage",
re_encrypt=True,
dry_run=True,
stdout=stdout,
stderr=stderr,
)

output = stdout.getvalue()
assert "DRY RUN" in output
assert "Would re-encrypt" in output

# Verify blob was NOT modified
blob.refresh_from_db()
assert blob.encryption_key_id == 0

def test_re_encrypt_with_limit(self):
"""Test that --limit restricts number of blobs re-encrypted."""
service = TieredStorageService()
key = secrets.token_hex(32)
service.encryption_keys = {"1": key}
service.active_key_id = 1

mailbox = factories.MailboxFactory()

# Create 3 blobs with key_id=0
for i in range(3):
blob = mailbox.create_blob(
content=f"test content {i}".encode(),
content_type="text/plain",
)
blob.encryption_key_id = 0
blob.save()

stdout = StringIO()
stderr = StringIO()

from unittest.mock import patch

with patch(
"core.management.commands.verify_tiered_storage.TieredStorageService"
) as mock_svc_class:
mock_svc_class.return_value = service

call_command(
"verify_tiered_storage",
re_encrypt=True,
limit=2,
stdout=stdout,
stderr=stderr,
)

output = stdout.getvalue()
assert "Blobs to re-encrypt: 2" in output

def test_re_encrypt_skips_blob_without_content(self):
"""Test that re-encrypt skips PostgreSQL blobs with no content."""
service = TieredStorageService()
key = secrets.token_hex(32)
service.encryption_keys = {"1": key}
service.active_key_id = 1

mailbox = factories.MailboxFactory()
blob = mailbox.create_blob(content=b"test", content_type="text/plain")
blob.encryption_key_id = 0
blob.raw_content = None # Simulate missing content
blob.save()

stdout = StringIO()
stderr = StringIO()

from unittest.mock import patch

with patch(
"core.management.commands.verify_tiered_storage.TieredStorageService"
) as mock_svc_class:
mock_svc_class.return_value = service

call_command(
"verify_tiered_storage",
re_encrypt=True,
stdout=stdout,
stderr=stderr,
)

output = stdout.getvalue()
assert "Skipped: 1" in output
# Blob row left unchanged.
blob.refresh_from_db()
assert blob.encryption_key_id == 0
assert blob.raw_content is None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

The --re-encrypt tests are configuring the wrong source of truth.

re_encrypt_blobs() branches on django.conf.settings.MESSAGES_BLOB_ENCRYPTION_* before it uses the mocked TieredStorageService. In this block, the tests mostly mutate the returned service instance but never override those Django settings, so they depend on ambient test settings instead of the scenario each test is trying to cover.

🛠️ Suggested pattern
-        with patch(
-            "core.management.commands.verify_tiered_storage.TieredStorageService"
-        ) as mock_svc_class:
+        with override_settings(
+            MESSAGES_BLOB_ENCRYPTION_KEYS={"1": key},
+            MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID=1,
+        ), patch(
+            "core.management.commands.verify_tiered_storage.TieredStorageService"
+        ) as mock_svc_class:
             mock_svc_class.return_value = service

Apply the same idea to the other re_encrypt=True cases so the command and the mocked service see the same encryption configuration.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/commands/test_verify_tiered_storage.py` around lines
322 - 619, The tests that call the management command with re_encrypt=True
mutate the mocked TieredStorageService but fail to patch the settings that
re_encrypt_blobs() reads (MESSAGES_BLOB_ENCRYPTION_KEYS /
MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID), so the command uses ambient Django
settings instead of the test scenario; fix each re_encrypt=True test by patching
the same settings module that re_encrypt_blobs() uses (e.g., patch
"core.services.tiered_storage.settings" or django.conf.settings) and set
MESSAGES_BLOB_ENCRYPTION_KEYS and MESSAGES_BLOB_ENCRYPTION_ACTIVE_KEY_ID to
match the service.encryption_keys and service.active_key_id before you patch
"core.management.commands.verify_tiered_storage.TieredStorageService", ensuring
the command and the mocked TieredStorageService see the same encryption config.

Comment on lines +61 to +159
def test_queues_eligible_blobs_by_age(self):
"""Test that task queues blobs older than cutoff date."""
mailbox = factories.MailboxFactory()

# Create an old blob (should be queued)
old_blob = mailbox.create_blob(
content=b"old content", content_type="text/plain"
)
Blob.objects.filter(id=old_blob.id).update(
created_at=now()
- timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1)
)

# Create a new blob (should not be queued)
new_blob = mailbox.create_blob(
content=b"new content", content_type="text/plain"
)

# Mock the delay call to track what gets queued
queued_ids = []
with patch.object(
offload_single_blob_task,
"delay",
side_effect=queued_ids.append,
):
result = offload_blobs_task()

assert result["status"] == "success"
assert str(old_blob.id) in queued_ids
assert str(new_blob.id) not in queued_ids

def test_queues_eligible_blobs_by_size(self):
"""Test that task respects minimum size threshold."""
mailbox = factories.MailboxFactory()

# Create a small blob (may or may not be queued depending on OFFLOAD_MIN_SIZE)
small_blob = mailbox.create_blob(content=b"small", content_type="text/plain")
Blob.objects.filter(id=small_blob.id).update(
created_at=now()
- timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1)
)

# Create a large blob (should be queued if old enough)
large_content = b"x" * (settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE + 1000)
large_blob = mailbox.create_blob(
content=large_content, content_type="text/plain"
)
Blob.objects.filter(id=large_blob.id).update(
created_at=now()
- timedelta(days=settings.TIERED_STORAGE_OFFLOAD_AFTER_DAYS + 1)
)

queued_ids = []
with patch.object(
offload_single_blob_task,
"delay",
side_effect=queued_ids.append,
):
result = offload_blobs_task()

assert result["status"] == "success"
assert str(large_blob.id) in queued_ids

# Small blob should only be queued if min_size is 0
if settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0:
assert str(small_blob.id) not in queued_ids

@override_settings(TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0)
def test_immediate_offload_with_zero_days(self):
"""Test that OFFLOAD_AFTER_DAYS=0 offloads blobs immediately.

With CELERY_TASK_ALWAYS_EAGER=True (test settings), calling .delay()
executes the task synchronously, simulating a worker running alongside.
"""
service = TieredStorageService()
mailbox = factories.MailboxFactory()
content = b"immediate offload test content" * 20
blob = mailbox.create_blob(content=content, content_type="text/plain")
storage_key = TieredStorageService.compute_storage_key_for_blob(blob)

try:
# No age manipulation - blob was just created
result = offload_blobs_task()

assert result["status"] == "success"
assert result["queued"] == 1

# With CELERY_TASK_ALWAYS_EAGER, the single blob task already ran
blob.refresh_from_db()
assert blob.storage_location == BlobStorageLocationChoices.OBJECT_STORAGE
assert blob.raw_content is None

# Verify content is still accessible from S3
retrieved = blob.get_content()
assert retrieved == content
finally:
if service.storage.exists(storage_key):
service.storage.delete(storage_key)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

These age-based offload tests still depend on the size filter.

offload_blobs_task() also requires size__gte=settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE. Here the payloads are small, so the tests can fail without ever exercising the age/immediacy logic when the default min size is non-zero.

🛠️ Suggested fix
-    def test_queues_eligible_blobs_by_age(self):
+    `@override_settings`(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0)
+    def test_queues_eligible_blobs_by_age(self):
         """Test that task queues blobs older than cutoff date."""
-    `@override_settings`(TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0)
+    `@override_settings`(
+        TIERED_STORAGE_OFFLOAD_AFTER_DAYS=0,
+        TIERED_STORAGE_OFFLOAD_MIN_SIZE=0,
+    )
     def test_immediate_offload_with_zero_days(self):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/tasks/test_tiered_storage_tasks.py` around lines 61 -
159, The age-based offload tests fail when TIERED_STORAGE_OFFLOAD_MIN_SIZE > 0
because offload_blobs_task filters by size__gte; update the tests
(test_queues_eligible_blobs_by_age and test_immediate_offload_with_zero_days) to
ensure created blobs meet the size threshold (e.g., make content length >=
settings.TIERED_STORAGE_OFFLOAD_MIN_SIZE) or apply
override_settings(TIERED_STORAGE_OFFLOAD_MIN_SIZE=0) for those tests so the
age/immediacy logic is exercised; reference the tests by name and the
offload_blobs_task and TieredStorageService.compute_storage_key_for_blob symbols
when making the change.

Comment thread src/backend/core/services/tiered_storage.py Fixed
Copy link
Copy Markdown
Contributor

@jbpenrath jbpenrath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just nitpicks

Comment thread src/backend/core/models.py Outdated
Comment thread src/backend/core/models.py
sylvinus and others added 14 commits May 3, 2026 19:07
This allows to use S3-compatible object storage to offload blobs,
making Postgres much lighter. We design for storing ~1B emails on
a single instance.
…f sensitive information'

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
…f sensitive information'

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Comment thread docs/tiered-storage.md
Comment on lines +3 to +8
Blobs (raw RFC822 email bodies and attachments) live in PostgreSQL by
default. Once a blob is older than `TIERED_STORAGE_OFFLOAD_AFTER_DAYS`,
a periodic celery task moves its bytes to S3 and clears the PG row's
`raw_content`. Reads transparently fetch from whichever location the
row points at — application code only ever calls `blob.get_content()`.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you concidered moving message body onto the object store while keeping headers around for longer?

This is generally performance critical (no object store hit and no heavy mail parsing) on email list load, while actually it is OK to store headers around in DB for much longer (months?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! We extract enough metadata from the headers to the DB (and to OpenSearch) to be able to have all email lists load without hitting the mime data, so I think it's already the case even if we don't store all headers?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, that will do the job actually I presume.

With all these data, what is the actual footprint of a typical message once tiered in the DB ?

Comment thread docs/tiered-storage.md Outdated
Comment on lines +16 to +18
- **Deduplication**: blobs sharing the same SHA-256 share the same S3
object. The DB is the source of truth; the existence check on S3 is
a defensive guard against external deletions.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice but how is this GCed if data is deleted ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We removed the existence check on S3 - external deletions on S3 are not a risk in the scope

Comment thread src/backend/core/services/tiered_storage.py Fixed
Comment thread src/backend/core/services/tiered_storage.py Dismissed
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
Makefile (1)

361-367: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

start-e2e is missing message-blobs bucket creation.

create-buckets provisions both buckets, but start-e2e only creates message-imports. Any e2e scenario that exercises tiered-storage blob offload will fail with a no-such-bucket error.

🛠️ Proposed fix
 start-e2e: ## Start e2e services (migrate, seed, etc.)
 	`@echo` "$(BLUE)\n\n| 🔧 Setting up E2E services... \n$(RESET)"
 	@$(COMPOSE_E2E) run --rm backend python manage.py create_bucket --storage message-imports --expire-days 1
+	@$(COMPOSE_E2E) run --rm backend python manage.py create_bucket --storage message-blobs
 	@$(COMPOSE_E2E) run --rm backend python manage.py migrate --noinput
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Makefile` around lines 361 - 367, The start-e2e Makefile target omits
creating the message-blobs bucket, causing tiered-storage tests to fail; update
the start-e2e recipe (target: start-e2e) to run the same create_bucket
invocation used for message-imports but for --storage message-blobs (i.e., add a
line invoking $(COMPOSE_E2E) run --rm backend python manage.py create_bucket
--storage message-blobs --expire-days 1 before migrate/search_index_create) so
both buckets are provisioned during e2e setup.
src/backend/core/factories.py (1)

372-395: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Make upload reservations opt-in in BlobFactory.

mailbox= is still the natural argument for “create a blob related to this mailbox”, but this branch now turns every such factory call into a live upload reservation. That leaks into non-upload fixtures in this module and can keep blobs reachable/GC-ineligible until the reservation expires, which makes lifecycle tests less trustworthy. Please keep plain factory creation side-effect free and add an explicit opt-in for upload-style fixtures.

Possible direction
 class BlobFactory(factory.django.DjangoModelFactory):
@@
     `@classmethod`
     def _create(cls, model_class, *args, **kwargs):
         content = kwargs.pop("content")
         content_type = kwargs.pop("content_type", "application/octet-stream")
         mailbox = kwargs.pop("mailbox", None)
-        if mailbox is not None:
+        reserve_upload = kwargs.pop("reserve_upload", False)
+        if reserve_upload:
+            if mailbox is None:
+                raise ValueError("reserve_upload=True requires mailbox=...")
             return upload_and_reserve_blob(mailbox, content, content_type)
         return models.Blob.objects.create_blob(
             content=content, content_type=content_type
         )

Then only the upload-path fixtures/tests would pass reserve_upload=True.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/factories.py` around lines 372 - 395, BlobFactory currently
treats any call with mailbox=... as an upload reservation; change _create to
make reservations opt-in by adding and popping a reserve_upload (or reserve)
boolean kwarg defaulting to False, keep mailbox as the way to associate a
mailbox but only call upload_and_reserve_blob(mailbox, content, content_type)
when mailbox is not None AND reserve_upload is True; otherwise call
models.Blob.objects.create_blob(content=..., content_type=...) so plain factory
creation remains side-effect free; update any tests/fixtures that need the
upload path to pass reserve_upload=True when invoking BlobFactory or its
helpers.
♻️ Duplicate comments (2)
src/backend/core/services/tiered_storage.py (2)

321-329: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't reuse a sibling unless the backing object still exists.

Returning the sibling's (key_id, compression) here makes the caller clear raw_content and flip the row to OBJECT_STORAGE without re-uploading anything. If that sibling row is stale and its bucket object has already been lost, this path throws away the last good copy of the blob. Verify the canonical key exists before short-circuiting; otherwise fall through to storage.save(...).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/services/tiered_storage.py` around lines 321 - 329, The
current dedupe short-circuit uses get_existing_sibling(sha256_bytes) and returns
its (key_id, compression) without verifying the backing object exists; instead,
after obtaining sibling (existing_key_id, existing_compression) call the storage
backend check (e.g., a head/exists method on self.storage or equivalent) for
that key_id/canonical object before returning; if the object check fails, do not
return the sibling—fall through to the normal upload path (storage.save(...)) so
the blob is re-uploaded and the row can be safely flipped to OBJECT_STORAGE.
Ensure you reference get_existing_sibling, the sibling tuple, and the storage
existence check when implementing this guard.

193-194: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Don't disable IAM-based S3 configurations.

This gate stays false unless endpoint_url or access_key is set, but a normal AWS S3 setup can rely on instance/role credentials and only provide bucket_name/region. In that deployment the whole offload path silently stays disabled. Gate on the presence of the blob storage config itself, or at least bucket_name, instead.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/services/tiered_storage.py` around lines 193 - 194, The
feature gate currently sets self.enabled = bool(opts.get("endpoint_url") or
opts.get("access_key")), which disables IAM-based S3 because it requires
endpoint_url or access_key; update the check to enable the offload when the blob
storage config is present (or at least a bucket is configured). Replace the
self.enabled assignment (using the existing opts variable) with a check like
bool(opts.get("bucket_name")) or fallback to bool(opts) so that instance/role
credential setups (no access_key/endpoint_url) are enabled; keep the rest of the
code and only change how self.enabled is computed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/tiered-storage.md`:
- Around line 106-120: The fenced shell code blocks in the tiered-storage doc
(the ```sh blocks showing openssl rand and MESSAGES_BLOBS_ENCRYPT_KEYS) need a
blank line before and after each fenced block to satisfy markdownlint MD031;
update each occurrence of those fenced snippets so there is an empty line
immediately above the opening ```sh and immediately below the closing ``` (apply
the same change to the repeated shell snippets throughout the document).
- Around line 299-300: The runbook step referencing a "same-sha sibling row" is
inconsistent with the deduplicated Blob model; replace that sentence so
operators are instructed to attempt recovery only via supported mechanisms: if
the DB row's storage_location is OBJECT_STORAGE but the S3 object is missing,
first check for a full system backup and restore the blob into S3 and verify
with `verify --mode=db-to-storage`; if no backup exists, attempt to re-ingest
the original content from the upstream source that produced the Blob (or
reconstruct it from application logs), and only if neither is possible mark the
DB row as unrecoverable (add a status/annotation and create an incident) rather
than copying `raw_content` from a non-existent sibling; update references to
`storage_location=POSTGRES`, `raw_content`, and the `Blob` record in the same
paragraph to reflect these supported recovery options.

In `@src/backend/core/admin.py`:
- Around line 1130-1136: The current exception handler for blob.get_content uses
redirect(".") which returns the user to the POST-only /download/ endpoint
causing a 405; update the redirect to send the user back to the blob change form
instead. In the except block that catches exceptions from blob.get_content
(where logging.exception, capture_exception, and messages.error are called),
replace redirect(".") with a redirect to the admin change view for that blob
(use Django's reverse with the appropriate admin change URL name and the blob's
id or the existing helper that builds the blob change URL) so the flashed error
is shown on the blob edit page.

In `@src/backend/core/api/viewsets/blob.py`:
- Around line 128-135: The current response rounds
settings.MAX_OUTGOING_ATTACHMENT_SIZE to 0 MB for small limits, producing
misleading "max 0 MB" messages; change the formatting logic in the blob upload
handler (the block using uploaded_file.size and
settings.MAX_OUTGOING_ATTACHMENT_SIZE that builds the Response) to display the
limit in a sensible unit: compute max_bytes =
settings.MAX_OUTGOING_ATTACHMENT_SIZE, then if max_bytes >= 1024*1024 format as
f"{max_bytes/(1024*1024):.0f} MB", elif max_bytes >= 1024 format as
f"{max_bytes/1024:.1f} KB", else format as f"{max_bytes} bytes"; use that string
in the error message instead of always using max_mb with .0f.

In `@src/backend/core/checks.py`:
- Around line 16-20: This file imports module-private symbols _MIN_KEY_LEN and
_decode_key from core.services.tiered_storage; replace that coupling by either
(A) promoting those symbols in tiered_storage to public names (e.g., MIN_KEY_LEN
and decode_key) and then import the public names, or (B) add a single public
helper validate_key_entry(...) inside core.services.tiered_storage that
encapsulates the checks currently done with _MIN_KEY_LEN, _decode_key and
normalize_key_entry, then here remove imports of _MIN_KEY_LEN and _decode_key
and call validate_key_entry(...) (or import the newly public
MIN_KEY_LEN/decode_key if you chose promotion); update any call sites in this
file that reference _MIN_KEY_LEN or _decode_key to use the new public API
(validate_key_entry or MIN_KEY_LEN/decode_key) so no private
(leading-underscore) symbols are imported.

In `@src/backend/core/management/commands/re_store_blobs.py`:
- Around line 189-232: _build_worklist currently materializes entire querysets
into memory by calling list(...) for postgres_ids and object_storage_ids; change
it to stream IDs instead (use QuerySet.iterator(chunk_size=...) or
values_list(...).iterator()) and build worklist incrementally until self.limit
(respecting restore_to_pg branch and the distinct("sha256","encryption_key_id")
cohort case) so you never load all IDs at once—replace the list(...) +
concatenation with loops that extend a local worklist (or yield IDs) from
postgres_qs.iterator() then object_storage_qs.iterator(), stopping as soon as
self.limit (>0) is reached; alternatively set a safe default for self.limit if
you want an ops-safety guard.
- Around line 151-170: The loop currently does an unlocked per-row fetch using
Blob.objects.get(id=blob_id) before calling _process_one, which then
re-fetches/refreshes the Blob under the advisory lock; remove the wasted
unlocked fetch and pass blob_id into _process_one instead so the method is
responsible for fetching the row under transaction.atomic() (using
Blob.objects.select_for_update().get(id=blob_id) or refresh_from_db inside the
lock) and for all lock-based checks (e.g., sha256 used to acquire the advisory
lock); update the loop to increment counts based on _process_one's return value
and keep the existing exception handling.

In `@src/backend/core/management/commands/verify_blobs.py`:
- Around line 171-180: The path parser in verify_blobs.py currently only checks
parts length and "blobs" prefix then converts parts[3] to bytes, allowing inputs
like blobs/1/zzz/<valid_sha> to slip through; update the try block that parses
obj_name (variables parts, key_id, sha_bytes) to also validate that parts[3] has
length 64 and that parts[2] == parts[3][:3] before calling bytes.fromhex,
raising ValueError when those checks fail so malformed shard/sha combos
increment invalid_count and produce the INVALID PATH warning via
self.stdout.write.
- Around line 159-163: Replace the silent return in the exception handler for
the storage listing with a raised CommandError so the command exits non-zero: in
the try/except around the call to self._list_storage_objects("blobs/") update
the except block to raise CommandError(f"Failed to list storage objects: {e}")
instead of calling return, and add CommandError to the imports from
django.core.management.base so the symbol is available.

In `@src/backend/core/mda/outbound.py`:
- Around line 411-415: In the MIME composition except block (the one catching
drf.exceptions.ValidationError then a generic except Exception as e in
src/backend/core/mda/outbound.py around the MIME composition code that logs
"Failed to compose MIME for message %s: %s"), replace the current logger.error
call with a logger.exception (or logger.error(..., exc_info=True)) so the full
traceback is preserved and sent to Sentry; keep the existing return False
behavior and the surrounding exception handling (including the
drf.exceptions.ValidationError pass-through) intact.

In
`@src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py`:
- Around line 272-281: The migration currently uses
core.models._default_upload_expiry as the default for the expires_at
DateTimeField which couples the migration to application code; define a local
callable named _default_upload_expiry at the top of this migration (matching the
pattern used for split_multi_message_attachments) that imports
django.utils.timezone and datetime.timedelta, computes timezone.now() +
timedelta(hours=1) (or the same TTL used elsewhere) and replace
default=core.models._default_upload_expiry with default=_default_upload_expiry
on the expires_at models.DateTimeField so the migration is self-contained and
immutable.

In `@src/backend/core/models.py`:
- Around line 2198-2232: user_can_access performs a deep OR'ed join (Message
blob_id OR draft_blob_id → Thread → ThreadAccess → Mailbox → MailboxAccess) that
will be very slow at scale; replace the final Message-based branch with a
mailbox-scoped EXISTS pattern: first compute a subquery of mailbox ids the user
can access via MailboxAccess/ThreadAccess (i.e. a MailboxAccess-filtered mailbox
id set using the Thread→ThreadAccess→Mailbox link), then check EXISTS against
each referencing table separately (Message with blob_id, Message with
draft_blob_id, Attachment, MessageTemplate, MailboxBlob) using that mailbox id
set — or alternatively implement a covering index on Message(blob_id,
draft_blob_id) if schema change is acceptable; update the Message-related check
in user_can_access accordingly (referencing MailboxBlob, Attachment,
MessageTemplate, Message, ThreadAccess, MailboxAccess symbols).
- Around line 2157-2165: The create_blob (caller via upload_and_reserve_blob)
forwards unfiltered **kwargs into self.create(...) which can collide with
explicit parameters (sha256, size, content_type, compression, raw_content,
encryption_key_id) and raise TypeError; fix this by filtering/whitelisting the
kwargs before calling self.create — e.g., build a new dict of allowed optional
fields (exclude reserved names like "sha256", "size", "content_type",
"compression", "raw_content", "encryption_key_id") or explicitly pop those keys
from kwargs, then pass only the safe dict into the self.create(...) call
referenced in the diff so no duplicate keyword arguments can occur.

In `@src/backend/core/services/importer/eml_tasks.py`:
- Around line 73-81: The code assumes an S3 client by accessing
message_imports_storage.connection.meta.client which raises AttributeError for
non-S3 backends; update the logic to detect whether message_imports_storage
provides an S3 client (e.g., check hasattr(message_imports_storage,
"connection") and hasattr(message_imports_storage.connection, "meta") and
hasattr(message_imports_storage.connection.meta, "client")), and if present use
s3_client.get_object(...) as before to set resp and file_content, otherwise fall
back to backend-agnostic message_imports_storage.open(file_key, "rb") and read
settings.MAX_INCOMING_EMAIL_SIZE+1 bytes (matching the previous Range semantics)
into file_content; keep existing variable names (message_imports_storage,
s3_client, resp, file_content) and preserve the same error handling for genuine
S3 errors.

In `@src/backend/core/services/tiered_storage.py`:
- Around line 500-511: Deferred orphan cleanup callback (_run passed to
transaction.on_commit) calls delete_if_orphaned without any advisory lock,
risking deleting blobs recreated concurrently; modify the callback to reacquire
the per-sha lock and DB transaction before running the check-and-delete: wrap
the body of _run in a new transaction.atomic() and call sha256_advisory_lock(s)
(or alternatively add the same locking inside delete_if_orphaned()), then call
delete_if_orphaned(s, k) inside that locked/atomic block so the exists() →
storage.delete() sequence is protected; keep transaction.on_commit(_run) as the
trigger.

In `@src/backend/core/tests/services/test_tiered_storage.py`:
- Around line 1277-1288: The test currently relies on BlobFactory creating three
distinct DB rows but DB-level dedup means they collapse to one; either rename
the test to reflect the single-row behavior or explicitly seed distinct rows so
the multi-row cohort path is exercised: change the Blob creation loop using
factories.BlobFactory to ensure unique sha per row (e.g., append a small unique
nonce to content when calling BlobFactory) or bypass dedup by creating rows
directly via the Blob model (use Blob.objects.create with distinct sha values)
so the later refresh_from_db() loop operates on multiple DB rows; update
assertions accordingly (refer to BlobFactory, blobs, and refresh_from_db()).

In `@src/backend/messages/celery_app.py`:
- Around line 53-68: The scheduled tasks "offload-blobs-to-object-storage" and
"gc-orphan-blobs" currently use the "default" queue; change their "options":
{"queue": "..."} entries to use a dedicated queue name like "blob-maintenance"
(e.g., options: {"queue": "blob-maintenance"}) so long-running blob maintenance
is isolated, and ensure any operator documentation or deployment configs create
a worker with appropriate concurrency (e.g., --concurrency=1) bound to the
"blob-maintenance" queue; update both task entries referencing
core.services.tiered_storage_tasks.offload_blobs_task and
core.services.blob_gc.gc_orphan_blobs_task.

---

Outside diff comments:
In `@Makefile`:
- Around line 361-367: The start-e2e Makefile target omits creating the
message-blobs bucket, causing tiered-storage tests to fail; update the start-e2e
recipe (target: start-e2e) to run the same create_bucket invocation used for
message-imports but for --storage message-blobs (i.e., add a line invoking
$(COMPOSE_E2E) run --rm backend python manage.py create_bucket --storage
message-blobs --expire-days 1 before migrate/search_index_create) so both
buckets are provisioned during e2e setup.

In `@src/backend/core/factories.py`:
- Around line 372-395: BlobFactory currently treats any call with mailbox=... as
an upload reservation; change _create to make reservations opt-in by adding and
popping a reserve_upload (or reserve) boolean kwarg defaulting to False, keep
mailbox as the way to associate a mailbox but only call
upload_and_reserve_blob(mailbox, content, content_type) when mailbox is not None
AND reserve_upload is True; otherwise call
models.Blob.objects.create_blob(content=..., content_type=...) so plain factory
creation remains side-effect free; update any tests/fixtures that need the
upload path to pass reserve_upload=True when invoking BlobFactory or its
helpers.

---

Duplicate comments:
In `@src/backend/core/services/tiered_storage.py`:
- Around line 321-329: The current dedupe short-circuit uses
get_existing_sibling(sha256_bytes) and returns its (key_id, compression) without
verifying the backing object exists; instead, after obtaining sibling
(existing_key_id, existing_compression) call the storage backend check (e.g., a
head/exists method on self.storage or equivalent) for that key_id/canonical
object before returning; if the object check fails, do not return the
sibling—fall through to the normal upload path (storage.save(...)) so the blob
is re-uploaded and the row can be safely flipped to OBJECT_STORAGE. Ensure you
reference get_existing_sibling, the sibling tuple, and the storage existence
check when implementing this guard.
- Around line 193-194: The feature gate currently sets self.enabled =
bool(opts.get("endpoint_url") or opts.get("access_key")), which disables
IAM-based S3 because it requires endpoint_url or access_key; update the check to
enable the offload when the blob storage config is present (or at least a bucket
is configured). Replace the self.enabled assignment (using the existing opts
variable) with a check like bool(opts.get("bucket_name")) or fallback to
bool(opts) so that instance/role credential setups (no access_key/endpoint_url)
are enabled; keep the rest of the code and only change how self.enabled is
computed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 26aa2e00-a560-4695-816c-84b0d4ec3879

📥 Commits

Reviewing files that changed from the base of the PR and between 6c43496 and fb9b0f4.

⛔ Files ignored due to path filters (1)
  • src/frontend/src/features/api/gen/blob/blob.ts is excluded by !**/gen/**
📒 Files selected for processing (60)
  • .github/workflows/messages.yml
  • Makefile
  • docs/architecture.md
  • docs/env.md
  • docs/tiered-storage.md
  • env.d/development/backend.defaults
  • src/backend/core/admin.py
  • src/backend/core/api/openapi.json
  • src/backend/core/api/serializers.py
  • src/backend/core/api/viewsets/blob.py
  • src/backend/core/api/viewsets/metrics.py
  • src/backend/core/apps.py
  • src/backend/core/checks.py
  • src/backend/core/enums.py
  • src/backend/core/factories.py
  • src/backend/core/management/commands/delete_orphan_attachments.py
  • src/backend/core/management/commands/re_store_blobs.py
  • src/backend/core/management/commands/verify_blobs.py
  • src/backend/core/mda/autoreply.py
  • src/backend/core/mda/draft.py
  • src/backend/core/mda/inbound_create.py
  • src/backend/core/mda/outbound.py
  • src/backend/core/migrations/0026_blob_encryption_key_id_blob_storage_location_and_more.py
  • src/backend/core/models.py
  • src/backend/core/services/blob_gc.py
  • src/backend/core/services/importer/eml_tasks.py
  • src/backend/core/services/search/coalescer.py
  • src/backend/core/services/search/search.py
  • src/backend/core/services/tiered_storage.py
  • src/backend/core/services/tiered_storage_tasks.py
  • src/backend/core/signals.py
  • src/backend/core/templates/admin/core/blob/change_form.html
  • src/backend/core/tests/api/test_attachments.py
  • src/backend/core/tests/api/test_draft_attachments.py
  • src/backend/core/tests/api/test_inbound_mta.py
  • src/backend/core/tests/api/test_mailbox_usage_metrics.py
  • src/backend/core/tests/api/test_maildomain_users_metrics.py
  • src/backend/core/tests/api/test_messages_delete.py
  • src/backend/core/tests/commands/__init__.py
  • src/backend/core/tests/commands/test_re_store_blobs.py
  • src/backend/core/tests/commands/test_verify_blobs.py
  • src/backend/core/tests/conftest.py
  • src/backend/core/tests/exporter/test_export_task.py
  • src/backend/core/tests/importer/test_file_import.py
  • src/backend/core/tests/mda/test_autoreply.py
  • src/backend/core/tests/mda/test_outbound.py
  • src/backend/core/tests/models/test_blob.py
  • src/backend/core/tests/search/test_e2e.py
  • src/backend/core/tests/search/test_e2e_modifiers.py
  • src/backend/core/tests/services/__init__.py
  • src/backend/core/tests/services/test_tiered_storage.py
  • src/backend/core/tests/tasks/__init__.py
  • src/backend/core/tests/tasks/test_task_importer.py
  • src/backend/core/tests/tasks/test_tiered_storage_tasks.py
  • src/backend/core/tests/test_checks.py
  • src/backend/core/tests/test_signals.py
  • src/backend/core/utils.py
  • src/backend/messages/celery_app.py
  • src/backend/messages/settings.py
  • src/backend/pyproject.toml
💤 Files with no reviewable changes (2)
  • src/backend/core/tests/exporter/test_export_task.py
  • src/backend/core/management/commands/delete_orphan_attachments.py

Comment thread docs/tiered-storage.md
Comment on lines +106 to +120
1. Generate a random secret string. Use `openssl rand` or equivalent
so the input is genuinely high-entropy:
```sh
openssl rand -base64 32
```
Startup emits a warning if any configured secret is shorter than
32 characters. The warning is a length check, not an entropy
measurement — `"a" * 32` passes silently. Treat the floor as a
tripwire for typos, not a security guarantee.
2. Add it to `MESSAGES_BLOBS_ENCRYPT_KEYS` as a JSON dict. Every
entry must spell out `algo` and `secret`. Add `"active": true`
to the entry whose key new blobs should be encrypted with:
```sh
MESSAGES_BLOBS_ENCRYPT_KEYS='{"1": {"algo": "aes-gcm", "secret": "<the secret>", "active": true}}'
```
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add blank lines around fenced code blocks here and in the repeated sections below.

This section trips markdownlint MD031, and the same pattern repeats across the other shell snippets in the document. Adding blank lines before and after each fence should keep docs lint green.

🧰 Tools
🪛 markdownlint-cli2 (0.22.1)

[warning] 108-108: Fenced code blocks should be surrounded by blank lines

(MD031, blanks-around-fences)


[warning] 110-110: Fenced code blocks should be surrounded by blank lines

(MD031, blanks-around-fences)


[warning] 118-118: Fenced code blocks should be surrounded by blank lines

(MD031, blanks-around-fences)


[warning] 120-120: Fenced code blocks should be surrounded by blank lines

(MD031, blanks-around-fences)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/tiered-storage.md` around lines 106 - 120, The fenced shell code blocks
in the tiered-storage doc (the ```sh blocks showing openssl rand and
MESSAGES_BLOBS_ENCRYPT_KEYS) need a blank line before and after each fenced
block to satisfy markdownlint MD031; update each occurrence of those fenced
snippets so there is an empty line immediately above the opening ```sh and
immediately below the closing ``` (apply the same change to the repeated shell
snippets throughout the document).

Comment thread docs/tiered-storage.md
Comment on lines +299 to +300
| DB row says `OBJECT_STORAGE` but S3 object missing | `verify --mode=db-to-storage` reports `MISSING` | The blob is unreadable. If a same-sha sibling row still has `raw_content` in PG, restore by setting the missing row to `storage_location=POSTGRES` with that `raw_content`. Otherwise restore from backup. |
| S3 object with no DB row | `verify --mode=storage-to-db` reports `ORPHAN` | After confirming, `aws s3 rm` the object. |
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

The same-sha sibling row recovery step contradicts the earlier dedup model.

Earlier in the doc, identical content is described as a single Blob row keyed by hash. In that model there is no second DB row to copy raw_content from, so this incident runbook points operators to a recovery path that should never exist. Please replace it with the real supported recovery path here.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/tiered-storage.md` around lines 299 - 300, The runbook step referencing
a "same-sha sibling row" is inconsistent with the deduplicated Blob model;
replace that sentence so operators are instructed to attempt recovery only via
supported mechanisms: if the DB row's storage_location is OBJECT_STORAGE but the
S3 object is missing, first check for a full system backup and restore the blob
into S3 and verify with `verify --mode=db-to-storage`; if no backup exists,
attempt to re-ingest the original content from the upstream source that produced
the Blob (or reconstruct it from application logs), and only if neither is
possible mark the DB row as unrecoverable (add a status/annotation and create an
incident) rather than copying `raw_content` from a non-existent sibling; update
references to `storage_location=POSTGRES`, `raw_content`, and the `Blob` record
in the same paragraph to reflect these supported recovery options.

Comment thread src/backend/core/admin.py
Comment on lines +1130 to +1136
try:
content = blob.get_content()
except Exception as exc: # pylint: disable=broad-exception-caught
logging.exception("Failed to fetch blob %s for admin download", blob.id)
capture_exception()
messages.error(request, f"Failed to download blob: {exc}")
return redirect(".")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Redirect back to the blob change form on download failure.

redirect(".") keeps the browser on the POST-only /download/ URL. After the redirect, the browser will issue a GET there and land on 405 Method Not Allowed instead of returning to the blob page with the flashed error.

Suggested fix
         try:
             content = blob.get_content()
         except Exception as exc:  # pylint: disable=broad-exception-caught
             logging.exception("Failed to fetch blob %s for admin download", blob.id)
             capture_exception()
             messages.error(request, f"Failed to download blob: {exc}")
-            return redirect(".")
+            return redirect("..")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/admin.py` around lines 1130 - 1136, The current exception
handler for blob.get_content uses redirect(".") which returns the user to the
POST-only /download/ endpoint causing a 405; update the redirect to send the
user back to the blob change form instead. In the except block that catches
exceptions from blob.get_content (where logging.exception, capture_exception,
and messages.error are called), replace redirect(".") with a redirect to the
admin change view for that blob (use Django's reverse with the appropriate admin
change URL name and the blob's id or the existing helper that builds the blob
change URL) so the flashed error is shown on the blob edit page.

Comment on lines +128 to +135
if uploaded_file.size > settings.MAX_OUTGOING_ATTACHMENT_SIZE:
max_mb = settings.MAX_OUTGOING_ATTACHMENT_SIZE / (1024 * 1024)
return Response(
{
"error": (
f"File too large: {uploaded_file.size} bytes "
f"(max {max_mb:.0f} MB per attachment)."
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don't round sub-MiB limits down to 0 MB.

With small test or self-hosted limits, this formats the cap as max 0 MB per attachment, which makes the 413 response misleading. Prefer bytes/KB for small values, or keep at least one decimal place.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/api/viewsets/blob.py` around lines 128 - 135, The current
response rounds settings.MAX_OUTGOING_ATTACHMENT_SIZE to 0 MB for small limits,
producing misleading "max 0 MB" messages; change the formatting logic in the
blob upload handler (the block using uploaded_file.size and
settings.MAX_OUTGOING_ATTACHMENT_SIZE that builds the Response) to display the
limit in a sensible unit: compute max_bytes =
settings.MAX_OUTGOING_ATTACHMENT_SIZE, then if max_bytes >= 1024*1024 format as
f"{max_bytes/(1024*1024):.0f} MB", elif max_bytes >= 1024 format as
f"{max_bytes/1024:.1f} KB", else format as f"{max_bytes} bytes"; use that string
in the error message instead of always using max_mb with .0f.

Comment on lines +16 to +20
from core.services.tiered_storage import (
_MIN_KEY_LEN,
_decode_key,
normalize_key_entry,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial | 💤 Low value

Avoid importing private symbols across modules.

_MIN_KEY_LEN, _decode_key, and normalize_key_entry are imported from core.services.tiered_storage — but the leading underscores mark the first two as module-private. Reaching across modules for them couples this check file to internal implementation details and silences the convention. Either promote them in the source module (rename to public, e.g., MIN_KEY_LEN/decode_key) or expose a single validate_key_entry(...) helper from tiered_storage and call that here.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/checks.py` around lines 16 - 20, This file imports
module-private symbols _MIN_KEY_LEN and _decode_key from
core.services.tiered_storage; replace that coupling by either (A) promoting
those symbols in tiered_storage to public names (e.g., MIN_KEY_LEN and
decode_key) and then import the public names, or (B) add a single public helper
validate_key_entry(...) inside core.services.tiered_storage that encapsulates
the checks currently done with _MIN_KEY_LEN, _decode_key and
normalize_key_entry, then here remove imports of _MIN_KEY_LEN and _decode_key
and call validate_key_entry(...) (or import the newly public
MIN_KEY_LEN/decode_key if you chose promotion); update any call sites in this
file that reference _MIN_KEY_LEN or _decode_key to use the new public API
(validate_key_entry or MIN_KEY_LEN/decode_key) so no private
(leading-underscore) symbols are imported.

Comment on lines +2198 to +2232
def user_can_access(self, user, blob_id) -> bool:
"""Authz: can ``user`` legitimately read this blob's content?

A user can access a blob if any of:

- They have an ``Attachment`` mailbox-access for an Attachment
referencing the blob.
- They have a ``MessageTemplate`` mailbox-access for a template
referencing the blob.
- They have ``ThreadAccess`` to the thread of any Message whose
``blob`` or ``draft_blob`` is the blob.
- The blob has an active upload reservation
(``MailboxBlob.expires_at > now()``) on a mailbox the user
can access (covers the JMAP upload-then-attach window).
"""
if user is None or not getattr(user, "is_authenticated", False):
return False

return (
MailboxBlob.objects.filter(
blob_id=blob_id,
mailbox__accesses__user=user,
expires_at__gt=timezone.now(),
).exists()
or Attachment.objects.filter(
blob_id=blob_id, mailbox__accesses__user=user
).exists()
or MessageTemplate.objects.filter(
blob_id=blob_id, mailbox__accesses__user=user
).exists()
or Message.objects.filter(
models.Q(blob_id=blob_id) | models.Q(draft_blob_id=blob_id),
thread__accesses__mailbox__accesses__user=user,
).exists()
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Confirm the call sites + the indexes Django actually creates for Message.blob_id / draft_blob_id.
rg -nP --type=py -C3 '\buser_can_access\s*\('
ast-grep --pattern $'class Message($_):
  $$$
  blob = $_
  $$$
'

Repository: suitenumerique/messages

Length of output: 14554


🏁 Script executed:

# Read the full user_can_access method
sed -n '2198,2232p' src/backend/core/models.py

Repository: suitenumerique/messages

Length of output: 1542


🏁 Script executed:

# Check the Message model's Meta class for any existing indexes
rg -A 20 'class Message\(BaseModel\)' src/backend/core/models.py | grep -A 15 'class Meta'

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Check if there are any migrations related to Message indexes
fd migration src/backend --type f | xargs grep -l 'Message.*index\|blob.*index' 2>/dev/null | head -5

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Look for the Blob model definition and its manager
rg -B 5 -A 30 'class BlobManager' src/backend/core/models.py

Repository: suitenumerique/messages

Length of output: 1188


🏁 Script executed:

# Find the exact location and content of Message.Meta
sed -n '1919,1924p' src/backend/core/models.py

Repository: suitenumerique/messages

Length of output: 231


🏁 Script executed:

# Search for all index definitions in the entire models.py to see the pattern used
rg 'indexes\s*=\s*\[' src/backend/core/models.py -A 5

Repository: suitenumerique/messages

Length of output: 1509


🏁 Script executed:

# Check if there are any db_index=True or indexes on blob/draft_blob fields
rg 'blob.*db_index|draft_blob.*db_index' src/backend/core/models.py

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Look at all Message model-related migrations to see index history
fd -e py src/backend/core/migrations | xargs grep -l "Message" | head -3 | xargs ls -lh

Repository: suitenumerique/messages

Length of output: 1567


🏁 Script executed:

# Count the join chain depth in the query
# Message → Thread → ThreadAccess → Mailbox → MailboxAccess → User
# Let's verify each relation
rg 'class ThreadAccess' src/backend/core/models.py -A 20 | head -30

Repository: suitenumerique/messages

Length of output: 1365


🏁 Script executed:

# Check MailboxAccess structure
rg 'class MailboxAccess' src/backend/core/models.py -A 15 | head -25

Repository: suitenumerique/messages

Length of output: 573


🏁 Script executed:

# Search for any TODO, FIXME, or optimization notes related to user_can_access or blob download
rg 'TODO|FIXME|XXX|optimize|perf' src/backend/core/models.py | grep -i 'user_can_access\|blob\|authz'

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Check if there are any recent related changes or comments
rg -B 2 'user_can_access' src/backend/core/models.py | head -20

Repository: suitenumerique/messages

Length of output: 439


🏁 Script executed:

# Final verification: check that the OR pattern isn't already handled elsewhere
rg 'Message.*blob_id\|draft_blob_id' src/backend/core/models.py

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Confirm no recent migration addressing this
ls -lt src/backend/core/migrations/*.py 2>/dev/null | head -5

Repository: suitenumerique/messages

Length of output: 625


user_can_access does a 4-level relational walk on a hot authz path — this warrants optimization before scaling to 1B messages.

The final branch joins Message → Thread → ThreadAccess → Mailbox → MailboxAccess and filters on (blob_id OR draft_blob_id). This runs on every authenticated blob download (blob.py:213) and the planner's behavior on the OR + deep join is sensitive to selectivity:

  • For deduplicated blobs shared across many messages (logos, forwarded attachments, common content), the inner Message set is large and the join fans out before the user filter narrows it.
  • Automatic FK indexes only cover single-column equality; the OR on two columns is not optimized.
  • The first three .exists() calls short-circuit typical "upload-then-attach" workflows, so this deep branch runs precisely for received-mail downloads—the highest-volume case at 1B-message scale.

Two concrete optimization paths to consider:

  1. Rewrite as a single subquery against MailboxAccess filtered by user=user, then EXISTS over the four reference tables intersected with that mailbox set — lets the planner short-circuit per-table.
  2. Add a covering index on Message(blob_id, draft_blob_id) (FK indexes alone do not cover the OR; a UNION ALL of two equality EXISTS typically plans better than OR).

Both require non-trivial work, but this path gates every body fetch and deserves priority before hitting the stated 1B-message target.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/models.py` around lines 2198 - 2232, user_can_access
performs a deep OR'ed join (Message blob_id OR draft_blob_id → Thread →
ThreadAccess → Mailbox → MailboxAccess) that will be very slow at scale; replace
the final Message-based branch with a mailbox-scoped EXISTS pattern: first
compute a subquery of mailbox ids the user can access via
MailboxAccess/ThreadAccess (i.e. a MailboxAccess-filtered mailbox id set using
the Thread→ThreadAccess→Mailbox link), then check EXISTS against each
referencing table separately (Message with blob_id, Message with draft_blob_id,
Attachment, MessageTemplate, MailboxBlob) using that mailbox id set — or
alternatively implement a covering index on Message(blob_id, draft_blob_id) if
schema change is acceptable; update the Message-related check in user_can_access
accordingly (referencing MailboxBlob, Attachment, MessageTemplate, Message,
ThreadAccess, MailboxAccess symbols).

Comment on lines 73 to +81
message_imports_storage = storages["message-imports"]
with message_imports_storage.open(file_key, "rb") as file:
file_content = file.read()
s3_client = message_imports_storage.connection.meta.client
limit = settings.MAX_INCOMING_EMAIL_SIZE
resp = s3_client.get_object(
Bucket=message_imports_storage.bucket_name,
Key=file_key,
Range=f"bytes=0-{limit}", # one byte past the limit
)
file_content = resp["Body"].read()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Hard AttributeError on non-S3 message-imports storage backends.

message_imports_storage.connection.meta.client is a boto3/S3Boto3Storage-specific attribute chain. Any other backend (e.g., FileSystemStorage used in local dev or CI without a real S3) raises AttributeError, which the outer except Exception swallows as the generic "An error occurred while processing the EML file." message — making the failure very hard to diagnose.

The previous path used storage.open(), which is backend-agnostic. The switch dropped backward compatibility for non-S3 deployments.

🛡️ Proposed fix — graceful fallback to storage.open() for non-S3 backends
-        message_imports_storage = storages["message-imports"]
-        s3_client = message_imports_storage.connection.meta.client
-        limit = settings.MAX_INCOMING_EMAIL_SIZE
-        resp = s3_client.get_object(
-            Bucket=message_imports_storage.bucket_name,
-            Key=file_key,
-            Range=f"bytes=0-{limit}",  # one byte past the limit
-        )
-        file_content = resp["Body"].read()
+        message_imports_storage = storages["message-imports"]
+        limit = settings.MAX_INCOMING_EMAIL_SIZE
+        try:
+            # S3-compatible path: use a Range request so an oversized upload
+            # can't OOM the worker before the size check fires.
+            s3_client = message_imports_storage.connection.meta.client
+            resp = s3_client.get_object(
+                Bucket=message_imports_storage.bucket_name,
+                Key=file_key,
+                Range=f"bytes=0-{limit}",  # bytes 0-limit inclusive = limit+1 bytes max
+            )
+            file_content = resp["Body"].read()
+        except AttributeError:
+            # Non-S3 backend (e.g. FileSystemStorage in development): fall back to
+            # a bounded read via the standard Django storage API.
+            logger.debug(
+                "message-imports storage is not S3-compatible; "
+                "falling back to storage.open() for %s",
+                file_key,
+            )
+            with message_imports_storage.open(file_key) as f:
+                file_content = f.read(limit + 1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
message_imports_storage = storages["message-imports"]
with message_imports_storage.open(file_key, "rb") as file:
file_content = file.read()
s3_client = message_imports_storage.connection.meta.client
limit = settings.MAX_INCOMING_EMAIL_SIZE
resp = s3_client.get_object(
Bucket=message_imports_storage.bucket_name,
Key=file_key,
Range=f"bytes=0-{limit}", # one byte past the limit
)
file_content = resp["Body"].read()
message_imports_storage = storages["message-imports"]
limit = settings.MAX_INCOMING_EMAIL_SIZE
try:
# S3-compatible path: use a Range request so an oversized upload
# can't OOM the worker before the size check fires.
s3_client = message_imports_storage.connection.meta.client
resp = s3_client.get_object(
Bucket=message_imports_storage.bucket_name,
Key=file_key,
Range=f"bytes=0-{limit}", # bytes 0-limit inclusive = limit+1 bytes max
)
file_content = resp["Body"].read()
except AttributeError:
# Non-S3 backend (e.g. FileSystemStorage in development): fall back to
# a bounded read via the standard Django storage API.
logger.debug(
"message-imports storage is not S3-compatible; "
"falling back to storage.open() for %s",
file_key,
)
with message_imports_storage.open(file_key) as f:
file_content = f.read(limit + 1)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/services/importer/eml_tasks.py` around lines 73 - 81, The
code assumes an S3 client by accessing
message_imports_storage.connection.meta.client which raises AttributeError for
non-S3 backends; update the logic to detect whether message_imports_storage
provides an S3 client (e.g., check hasattr(message_imports_storage,
"connection") and hasattr(message_imports_storage.connection, "meta") and
hasattr(message_imports_storage.connection.meta, "client")), and if present use
s3_client.get_object(...) as before to set resp and file_content, otherwise fall
back to backend-agnostic message_imports_storage.open(file_key, "rb") and read
settings.MAX_INCOMING_EMAIL_SIZE+1 bytes (matching the previous Range semantics)
into file_content; keep existing variable names (message_imports_storage,
s3_client, resp, file_content) and preserve the same error handling for genuine
S3 errors.

Comment on lines +500 to +511
def _run(s=sha256_bytes, k=key_id):
try:
self.delete_if_orphaned(s, k)
except Exception: # pylint: disable=broad-except
logger.exception(
"Post-commit S3 cleanup failed for sha=%s key_id=%d; "
"verify_blobs --mode=storage-to-db will list strays",
s.hex()[:8],
k,
)

transaction.on_commit(_run)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Reacquire the per-sha lock in deferred orphan cleanup.

defer_delete_if_orphaned() runs after commit with no advisory lock, and delete_if_orphaned() does a plain exists()storage.delete() sequence. A concurrent offload/restore/rotation can recreate the same (sha256, key_id) object between those two operations, and this callback will delete bytes that a fresh DB row now references. Run the orphan check and delete under a new transaction.atomic(), sha256_advisory_lock(...) block inside the callback, or move that locking into delete_if_orphaned() itself.

Also applies to: 513-542

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/core/services/tiered_storage.py` around lines 500 - 511, Deferred
orphan cleanup callback (_run passed to transaction.on_commit) calls
delete_if_orphaned without any advisory lock, risking deleting blobs recreated
concurrently; modify the callback to reacquire the per-sha lock and DB
transaction before running the check-and-delete: wrap the body of _run in a new
transaction.atomic() and call sha256_advisory_lock(s) (or alternatively add the
same locking inside delete_if_orphaned()), then call delete_if_orphaned(s, k)
inside that locked/atomic block so the exists() → storage.delete() sequence is
protected; keep transaction.on_commit(_run) as the trigger.

Comment thread src/backend/core/tests/services/test_tiered_storage.py
Comment on lines +53 to +68
"offload-blobs-to-object-storage": {
"task": "core.services.tiered_storage_tasks.offload_blobs_task",
"schedule": 3600.0, # Every hour
"options": {"queue": "default"},
},
"gc-orphan-blobs": {
# Drains the Redis candidate set populated by reference-source
# post_delete signals. Hourly with a 55-min in-task budget;
# see ``core/services/blob_gc.py``. The "full" mode (which
# walks every Blob row as a safety net) is invoked manually
# via ``celery call core.services.blob_gc.gc_orphan_blobs_task --kwargs '{"mode": "full"}'``
# on whatever cadence the operator wants — typically weekly.
"task": "core.services.blob_gc.gc_orphan_blobs_task",
"schedule": 3600.0,
"options": {"queue": "default"},
},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider isolating long-running blob maintenance tasks from the "default" queue.

Both offload-blobs-to-object-storage and gc-orphan-blobs carry a documented ~55-minute per-tick budget and share the "default" queue with general workloads. In the worst case, a single worker picking up both in one beat cycle can starve other "default" tasks for up to ~110 minutes.

A dedicated queue (e.g., "blob-maintenance") would:

  • bound the blast radius to blob ops only,
  • allow independent concurrency/worker settings (e.g., --concurrency=1), and
  • keep latency predictable for unrelated "default" work.
📋 Suggested queue change
         "offload-blobs-to-object-storage": {
             "task": "core.services.tiered_storage_tasks.offload_blobs_task",
             "schedule": 3600.0,
-            "options": {"queue": "default"},
+            "options": {"queue": "blob-maintenance"},
         },
         "gc-orphan-blobs": {
             "task": "core.services.blob_gc.gc_orphan_blobs_task",
             "schedule": 3600.0,
-            "options": {"queue": "default"},
+            "options": {"queue": "blob-maintenance"},
         },
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/backend/messages/celery_app.py` around lines 53 - 68, The scheduled tasks
"offload-blobs-to-object-storage" and "gc-orphan-blobs" currently use the
"default" queue; change their "options": {"queue": "..."} entries to use a
dedicated queue name like "blob-maintenance" (e.g., options: {"queue":
"blob-maintenance"}) so long-running blob maintenance is isolated, and ensure
any operator documentation or deployment configs create a worker with
appropriate concurrency (e.g., --concurrency=1) bound to the "blob-maintenance"
queue; update both task entries referencing
core.services.tiered_storage_tasks.offload_blobs_task and
core.services.blob_gc.gc_orphan_blobs_task.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Scalable email storage

4 participants