Skip to content

✨(tasks) switch from celery to dramatiq#560

Open
sylvinus wants to merge 1 commit into
mainfrom
dramatiq2
Open

✨(tasks) switch from celery to dramatiq#560
sylvinus wants to merge 1 commit into
mainfrom
dramatiq2

Conversation

@sylvinus
Copy link
Copy Markdown
Member

@sylvinus sylvinus commented Feb 23, 2026

Replaces #390

Summary by CodeRabbit

  • Chores
    • Infrastructure update: Replaced background task queue system with improved task processing framework
    • Enhanced task progress tracking and result retrieval for long-running operations
    • Improved logging capabilities with new command-line tools for monitoring worker and service logs
    • Updated project dependencies to streamline background processing

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Feb 23, 2026

📝 Walkthrough

Walkthrough

This pull request executes a comprehensive migration from Celery to Dramatiq as the task queue framework. The changes involve replacing Celery decorators and patterns with new Dramatiq-based utilities (register_task, cron_task), migrating progress tracking to a cache-based mechanism, updating task ownership tracking via instance methods, removing legacy Celery configuration and dependencies, and refactoring the worker CLI and test infrastructure throughout the application.

Changes

Cohort / File(s) Summary
Core Task Framework & Utilities
src/backend/core/utils.py, src/backend/core/tasks.py
Introduces Dramatiq integration layer with EagerBroker for testing, Task wrapper class with track_owner method, register_task and cron_task decorators, and new get_task_tracking/set_task_progress/get_task_progress utilities for cache-based progress and ownership tracking.
Core Task Decorators & Registry
src/backend/core/management/commands/run_task.py, src/backend/messages/__init__.py, src/backend/messages/celery_app.py (deleted)
Removes Celery app initialization; adds Django configuration setup at import time; replaces Celery task invocation with direct function calls in task runner.
Inbound/Outbound Message Tasks
src/backend/core/mda/inbound_tasks.py, src/backend/core/mda/outbound_tasks.py, src/backend/core/mda/outbound.py
Replaces Celery task decorators with register_task and cron_task; removes bound self parameter; introduces set_task_progress for progress reporting; adds explicit error handling and structured result returns; updates logging from Celery to standard Python logging.
Import Tasks - All File Types
src/backend/core/services/importer/eml_tasks.py, src/backend/core/services/importer/imap_tasks.py, src/backend/core/services/importer/mbox_tasks.py, src/backend/core/services/importer/pst_tasks.py
Converts from @celery_app.task(bind=True) to @register_task(queue="imports"); removes self parameter; replaces Celery logger with Python logging; migrates progress updates from self.update_state to set_task_progress calls; adds mailbox existence validation.
Import Service & IMAP Processing
src/backend/core/services/importer/imap.py, src/backend/core/services/importer/service.py
Replaces task_instance parameter with set_task_progress calls; removes Celery logger; updates register_task_owner calls to task.track_owner method; caps progress at 99% during processing; adjusts failure tracking logic.
Export Tasks
src/backend/core/services/exporter/tasks.py
Converts from bound Celery task to @register_task(queue="management"); removes self.update_state calls and replaces with set_task_progress; updates logging and error handling; maintains core export and S3 upload logic.
Search Tasks
src/backend/core/services/search/tasks.py
Replaces Celery task decorators with @register_task(queue="reindex") on all entry points; removes update_progress callback parameter; introduces set_task_progress for progress reporting with percentage calculations; adds explicit error handling for missing threads/mailboxes.
Task Detail View & Ownership Tracking
src/backend/core/api/viewsets/task.py, src/backend/core/admin.py, src/backend/core/api/viewsets/send.py
Introduces TASK_STATES constant and updates OpenAPI schema; replaces Celery AsyncResult and cache-based status with Dramatiq result retrieval and explicit result handling (ResultMissing/ResultFailure); replaces register_task_owner calls with task.track_owner method; implements progress-based response returns.
Worker CLI & Configuration
src/backend/worker.py, src/backend/messages/settings.py, src/backend/pyproject.toml, compose.yaml, Makefile
Removes Celery-specific CLI options (--loglevel, --disable-scheduler) and replaces with --verbosity (-v); adds discover_tasks_modules function; updates Django settings to use DRAMATIQ_BROKER (RedisBroker) with middleware stack instead of CELERY_* settings; adds three new Makefile log-viewing targets; updates dependencies to remove celery and django-celery packages, adds django-dramatiq and dramatiq-crontab.
Test Infrastructure
src/backend/core/tests/api/test_import_file_upload.py, src/backend/core/tests/api/test_draft_attachments.py, src/backend/core/tests/api/test_messages_create.py, src/backend/core/tests/api/test_messages_import.py
Replaces Celery AsyncResult mocking with cache-based task tracking (json serialization of ownership metadata); updates test invocations from apply/get pattern to direct function calls; adjusts MessageFactory usage with is_sender parameter for outbound drafts.
Test Infrastructure - Exporter & Importer
src/backend/core/tests/exporter/test_export_task.py, src/backend/core/tests/importer/test_file_import.py, src/backend/core/tests/importer/test_imap_import.py, src/backend/core/tests/importer/test_import_service.py, src/backend/core/tests/importer/test_pst_import.py
Removes mocking of task.update_state and Celery backend plumbing; replaces with set_task_progress mocks; updates test execution to call task functions directly instead of via Celery apply/get; simplifies mock scaffolding by removing intermediate state-update patching.
Test Infrastructure - Worker & Tasks
src/backend/core/tests/test_worker.py, src/backend/core/tests/mda/test_retry.py, src/backend/core/tests/mda/test_spam_processing.py, src/backend/core/tests/tasks/test_task_importer.py, src/backend/core/tests/tasks/test_task_send_message.py
Replaces Celery configuration tests with Dramatiq broker/autodiscovery validation; updates CLI argument tests from --loglevel/--disable-scheduler to --verbosity; replaces update_state mocking with set_task_progress mocks; updates task invocation from .apply().get() to direct function calls with progress assertion adjustments.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

Suggested reviewers

  • jbpenrath
  • sdemagny

Poem

🐰 Hop-hooray, the Celery fades away,
Dramatiq's bright dance leads the way,
Progress cached, tasks tracked with care,
Worker queues floating through the air. 🚀

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective of this large changeset: migrating the task queue system from Celery to Dramatiq across the entire backend codebase.
Docstring Coverage ✅ Passed Docstring coverage is 94.26% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch dramatiq2

Tip

Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines +95 to +99
return Response({
"status": "FAILURE",
"result": None,
"error": str(exc),
})

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 3 months ago

In general, the fix is to avoid returning raw exception messages (or stack traces) directly to the client. Instead, log the detailed exception server-side and send the client a generic, non-sensitive error message or code while preserving the existing API contract where possible.

For this specific code, the best minimal fix is: in the except ResultFailure as exc: block, log the exception (including the stack trace) using the existing logger, and change the "error" field in the response to a generic message such as "Task failed" or "An internal error has occurred.". This keeps the response structure (status, result, error) and HTTP status code unchanged, so existing clients still receive "status": "FAILURE" and an error string, but without leaking internal exception details.

Concretely, in src/backend/core/api/viewsets/task.py, in the TaskDetailView.get method, update the except ResultFailure as exc: clause starting at line 94. Add a logger.exception(...) call before returning the response, and change "error": str(exc) to a generic error string. No new imports are needed because logging and logger are already defined at the top of the file.

Suggested changeset 1
src/backend/core/api/viewsets/task.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/src/backend/core/api/viewsets/task.py b/src/backend/core/api/viewsets/task.py
--- a/src/backend/core/api/viewsets/task.py
+++ b/src/backend/core/api/viewsets/task.py
@@ -92,10 +92,11 @@
         except ResultMissing:
             result_data = None
         except ResultFailure as exc:
+            logger.exception("Task %s failed when fetching result.", task_id)
             return Response({
                 "status": "FAILURE",
                 "result": None,
-                "error": str(exc),
+                "error": "Task failed.",
             })
 
         if result_data is not None:
EOF
@@ -92,10 +92,11 @@
except ResultMissing:
result_data = None
except ResultFailure as exc:
logger.exception("Task %s failed when fetching result.", task_id)
return Response({
"status": "FAILURE",
"result": None,
"error": str(exc),
"error": "Task failed.",
})

if result_data is not None:
Copilot is powered by AI and may make mistakes. Always verify output.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (11)
src/backend/core/tests/api/test_messages_create.py (1)

1029-1047: ⚠️ Potential issue | 🟡 Minor

Consider adding is_sender=True to message on line 1029 to clarify the test's intent.

message2 (the success path) now requires is_sender=True because the send endpoint reaches the asynchronous send_message_task, which validates this flag. message (the EDITOR-denial path) currently lacks this flag, but the 403 response comes from the synchronous role check in IsAllowedToAccess.has_object_permission(), which runs before any is_sender validation.

Adding is_sender=True to message ensures both test phases have identical message state, making it unambiguous that only the mailbox role differs. This aligns with test clarity best practices.

Proposed fix
 message = factories.MessageFactory(
     thread=thread_access.thread,
     is_draft=True,
+    is_sender=True,
     sender=factories.ContactFactory(mailbox=mailbox),
 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/api/test_messages_create.py` around lines 1029 - 1047,
The test's denial path uses a MessageFactory-created message without
is_sender=True which makes the 403 come from
IsAllowedToAccess.has_object_permission rather than matching the async
send_message_task validation; update the first message creation (the variable
named message created via factories.MessageFactory) to include is_sender=True so
both the denial path and the success path share the same message state and only
the mailbox/role differs, ensuring clarity in the test intent and coverage.
src/backend/pyproject.toml (1)

80-95: ⚠️ Potential issue | 🟡 Minor

flower dev-dependency is Celery-specific and should be removed.

flower==2.0.1 is a web UI for Celery task monitoring and has no functionality outside a Celery stack. With celery removed as a production dependency, flower is a dead dependency that will fail if invoked.

🛠️ Proposed fix
 dev = [
     "django-extensions==4.1",
     "drf-spectacular-sidecar==2026.1.1",
-    "flower==2.0.1",
     "hypothesis==6.151.9",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/pyproject.toml` around lines 80 - 95, Remove the dead
Celery-specific dev-dependency by deleting the "flower==2.0.1" entry from the
dev dependencies list in pyproject.toml; search for the exact token
flower==2.0.1 in the file and remove that line (and trailing comma adjustment if
necessary) so the remaining list remains valid TOML/JSON array syntax.
compose.yaml (1)

169-188: ⚠️ Potential issue | 🟠 Major

worker-ui service is broken — still references removed Celery/Flower stack.

Line 188 runs celery -A messages.celery_app flower --port=8803, but Celery was removed as a dependency and messages.celery_app was deleted. This service will fail to start.

Either remove this service entirely or replace it with a Dramatiq-compatible monitoring solution (e.g., django-dramatiq admin UI or a custom status endpoint).

🔧 Minimal fix — remove the broken service
-  worker-ui:
-    build:
-      context: src/backend
-      target: runtime-dev
-      args:
-        DOCKER_USER: ${DOCKER_USER:-1000}
-    user: ${DOCKER_USER:-1000}
-    depends_on:
-      - redis
-    environment:
-      - FLOWER_UNAUTHENTICATED_API=true
-      - DJANGO_CONFIGURATION=Development
-    env_file:
-      - env.d/development/backend.defaults
-      - env.d/development/backend.local
-    volumes:
-      - ./src/backend:/app
-    ports:
-      - "8903:8803"
-    command: celery -A messages.celery_app flower --port=8803
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compose.yaml` around lines 169 - 188, The worker-ui service is broken because
it still runs the removed Celery/Flower command `celery -A messages.celery_app
flower --port=8803`; remove or replace this service: either delete the
`worker-ui` service block entirely, or update it to a Dramatiq-compatible
monitor (e.g., run a dramatiq admin UI or a custom status endpoint) and remove
the Celery-specific env/command references (`FLOWER_UNAUTHENTICATED_API`,
`celery -A messages.celery_app flower --port=8803`) and adjust volumes/ports
accordingly so the compose file only references existing Dramatiq components.
src/backend/core/services/search/tasks.py (1)

115-153: ⚠️ Potential issue | 🟠 Major

threads_viewer is a method; call it to get a queryset.

Without (), threads_qs is a bound method and .count() will fail.

🐛 Fix method invocation
-        threads_qs = models.Mailbox.objects.get(id=mailbox_id).threads_viewer
+        threads_qs = models.Mailbox.objects.get(id=mailbox_id).threads_viewer()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/search/tasks.py` around lines 115 - 153,
threads_viewer is a method on models.Mailbox but the code assigns the bound
method to threads_qs, so .count()/.iterator() will fail; update the assignment
to call the method (use
models.Mailbox.objects.get(id=mailbox_id).threads_viewer()) so threads_qs is a
queryset, leaving the subsequent .count(), .iterator(), index_thread() loop, and
set_task_progress() logic unchanged.
src/backend/core/services/importer/pst_tasks.py (1)

132-143: ⚠️ Potential issue | 🟡 Minor

Wrap the progress calculation to meet the 100‑char line limit.

The inline conditional is over the 100‑char limit and is harder to read.

🧹 Suggested formatting
-                        progress_pct = min(int((current_message / total_messages) * 100), 99) if total_messages > 0 else 0
+                        if total_messages > 0:
+                            progress_pct = min(
+                                int(current_message / total_messages * 100), 99
+                            )
+                        else:
+                            progress_pct = 0
As per coding guidelines, Follow Django/PEP 8 style with a 100-character line limit.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/importer/pst_tasks.py` around lines 132 - 143, The
inline conditional computing progress_pct is too long; split the expression
across multiple lines or compute it in two steps to respect the 100-char limit.
For example, assign a safe_divisor check or intermediate ratio variable using
current_message and total_messages, then compute progress_pct = min(int(ratio *
100), 99) if total_messages > 0 else 0, and keep the set_task_progress call
intact; update the code around the progress_pct assignment in pst_tasks.py
(symbols: progress_pct, current_message, total_messages, set_task_progress) to
use the wrapped/stepwise form.
src/backend/core/tests/importer/test_import_service.py (1)

159-171: ⚠️ Potential issue | 🟡 Minor

Ineffective mock — patch target doesn't intercept calls from eml_tasks.py.

eml_tasks.py uses from core.mda.inbound import deliver_inbound_message, creating a local binding in that module. Patching core.mda.inbound.deliver_inbound_message only replaces the name in core.mda.inbound, not the already-bound reference in eml_tasks. As a result mock_deliver is never called — the real deliver_inbound_message runs, and the test passes incidentally because is_import=True causes direct message creation anyway.

The same bug is present in test_import_file_eml_by_user_with_access_sync (line 238).

🐛 Proposed fix
-    with patch("core.mda.inbound.deliver_inbound_message", side_effect=mock_deliver):
+    with patch("core.services.importer.eml_tasks.deliver_inbound_message", side_effect=mock_deliver):

Apply the same correction at line 238.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/importer/test_import_service.py` around lines 159 -
171, The test tries to mock deliver_inbound_message but patches
core.mda.inbound; because eml_tasks.py does "from core.mda.inbound import
deliver_inbound_message" you must patch the symbol where it's used instead:
update the patch target to core.mda.eml_tasks.deliver_inbound_message (for both
the case around process_eml_file_task at the current block and the other test at
the second occurrence in test_import_file_eml_by_user_with_access_sync) so that
mock_deliver intercepts the call made by process_eml_file_task and returns True
as intended.
src/backend/core/tests/importer/test_imap_import.py (1)

176-223: ⚠️ Potential issue | 🟡 Minor

Patch target likely incorrect for set_task_progress mock.

import_imap_messages_task is defined in core.services.importer.imap_tasks, so patching core.services.importer.imap.set_task_progress won’t intercept the calls and the mock assertions will fail. Update both occurrences to target the task module.

🔧 Proposed fix
-    with patch("core.services.importer.imap.set_task_progress") as mock_set_progress:
+    with patch("core.services.importer.imap_tasks.set_task_progress") as mock_set_progress:
...
-    with patch("core.services.importer.imap.set_task_progress") as mock_set_progress:
+    with patch("core.services.importer.imap_tasks.set_task_progress") as mock_set_progress:

Also applies to: 281-339

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/importer/test_imap_import.py` around lines 176 - 223,
The test is patching set_task_progress in the wrong module; replace patch
targets that currently use "core.services.importer.imap.set_task_progress" with
the task module "core.services.importer.imap_tasks.set_task_progress" so the
mock intercepts calls from import_imap_messages_task; update both places in this
test file where set_task_progress is patched (the block around the successful
import assertions and the other similar block later) to reference
core.services.importer.imap_tasks.set_task_progress.
src/backend/core/mda/inbound_tasks.py (2)

274-281: ⚠️ Potential issue | 🟡 Minor

Missing Sentry reporting for unhandled exceptions.

The broad except Exception at line 274 logs and saves the error but does not call capture_exception(e). This is inconsistent with the project guideline requiring Sentry reporting.

Proposed fix
+from sentry_sdk import capture_exception
 ...
     except Exception as e:
+        capture_exception(e)
         logger.exception(
             "Error processing inbound message %s: %s", inbound_message_id, e
         )

Based on learnings: "Capture and report exceptions to Sentry; use capture_exception() for custom errors."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/mda/inbound_tasks.py` around lines 274 - 281, The exception
handler in inbound_tasks.py (the except Exception block that logs the error,
updates inbound_message.error_message, and returns the failure dict) is missing
Sentry reporting; add a call to capture_exception(e) (imported from sentry_sdk
or the project's Sentry wrapper) inside this except block—preferably immediately
after logger.exception and before updating inbound_message/save and returning—to
ensure all unhandled exceptions in the function (where inbound_message is
referenced) are reported to Sentry.

302-302: ⚠️ Potential issue | 🔴 Critical

Import timedelta from datetime module — timezone.timedelta does not exist.

Line 302 uses timezone.timedelta(minutes=5), but django.utils.timezone does not expose timedelta. This will raise AttributeError at runtime when the cron task fires. Import timedelta from datetime and use it directly.

Fix
 import re
 from typing import Any, Dict, Optional, Tuple
+from datetime import timedelta
 
 from django.core.cache import cache
 from django.utils import timezone
-    retry_threshold = timezone.now() - timezone.timedelta(minutes=5)
+    retry_threshold = timezone.now() - timedelta(minutes=5)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/mda/inbound_tasks.py` at line 302, The code sets
retry_threshold using timezone.timedelta which doesn't exist; import timedelta
from datetime and change the assignment in inbound_tasks.py so retry_threshold =
timezone.now() - timedelta(minutes=5) (keep the existing timezone.now() usage
and update the import to include from datetime import timedelta) to avoid the
AttributeError at runtime.
src/backend/core/mda/outbound_tasks.py (1)

21-74: ⚠️ Potential issue | 🟡 Minor

Missing Sentry reporting in the archive exception handler.

Line 59 catches a broad exception during archiving but only logs it. Per project guidelines, exceptions should also be reported to Sentry via capture_exception(). Also, if message.thread is None, the except block at line 63 will itself raise an AttributeError when accessing thread.id, masking the original error.

Proposed fix
+from sentry_sdk import capture_exception
 ...
         try:
             thread = message.thread
             models.Message.objects.filter(thread=thread).update(
                 is_archived=True, archived_at=timezone.now()
             )
             thread.update_stats()
             archived = True
             set_task_progress(90, {"message": "Thread archived"})
         except Exception as e:
             # Not critical, just log the error
+            capture_exception(e)
             logger.exception(
                 "Error in send_message_task when archiving thread %s after sending message %s: %s",
-                thread.id,
+                getattr(thread, "id", None),
                 message_id,
                 e,
             )

Based on learnings: "Capture and report exceptions to Sentry; use capture_exception() for custom errors."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/mda/outbound_tasks.py` around lines 21 - 74, In
send_message_task, the except block that handles archiving (around the thread
variable) only logs the error and then references thread.id which can raise a
new AttributeError; update this handler to call capture_exception(e) to report
to Sentry, avoid dereferencing thread if it may be None by using a safe
identifier (e.g., getattr(thread, "id", None) or message.thread_id) when
building the log message, and keep the logger.exception call but include the
safe id value; ensure the change is applied in the except block that currently
references thread.id and logger.exception so original exceptions are reported
and not masked.
src/backend/core/services/exporter/tasks.py (1)

439-450: ⚠️ Potential issue | 🟠 Major

Progress metadata structure in exporter tasks uses nested result dict, breaking API consumer expectations.

The task progress API endpoint (task.py line 120) extracts the progress message via metadata.get("message"), expecting a flat metadata structure. However, the exporter code at lines 439–450 (and elsewhere in mbox/eml tasks) nests metadata as {"result": {...}, "error": None}, causing the API to return "message": None to the frontend. This differs from flat patterns in outbound_tasks.py and search/tasks.py, which use {"message": "..."}. The inconsistency causes progress messages to disappear for exporter/importer tasks. Align all task progress metadata to a single structure—either flatten exporter metadata to {"message": "...", "result": {...}, ...} or update the API consumer to handle both patterns.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/exporter/tasks.py` around lines 439 - 450, The
exporter tasks are nesting progress under "result" which breaks the API consumer
expecting top-level "message"; update all set_task_progress calls in
exporter/mbox/eml task code (the call sites using set_task_progress that
currently pass {"result": {...}, "error": None}) to include a top-level
"message" (and other top-level fields like "error" if needed) while keeping the
detailed stats under "result" (e.g. {"message": "Counting messages", "result":
{...}, "error": None}); locate usages by searching for set_task_progress in
exporter tasks and adjust the metadata shape to match the flat pattern used by
outbound_tasks.py and search/tasks.py so task.py can read
metadata.get("message") correctly.
♻️ Duplicate comments (5)
src/backend/core/tests/importer/test_pst_import.py (3)

1218-1221: Same concern as above regarding direct actor calls returning Message objects rather than results.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/importer/test_pst_import.py` around lines 1218 - 1221,
The test is directly invoking process_pst_file_task and getting an actor Message
instead of the actual result; update the call to await the actor's response
(e.g., use the actor's ask/ask_nowait or .ask(...).get() / await .ask(...)
pattern your actor framework provides) so result contains the task return value
rather than a Message object; modify the invocation of process_pst_file_task to
use the actor await API and adjust assertions to use that awaited value.

1180-1183: Same concern as above regarding direct actor calls returning Message objects rather than results.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/importer/test_pst_import.py` around lines 1180 - 1183,
The test invokes the actor via process_pst_file_task directly which returns a
Message object instead of the actual result; update the test to unwrap the actor
response (for example by awaiting or calling the Message's result/get method or
using the actor ask API) so you assert on the real return value rather than the
Message placeholder — specifically change the call site around
process_pst_file_task(file_key=file_key, recipient_id=str(mailbox.id)) to
retrieve the underlying result before making assertions.

1261-1264: Same concern as above regarding direct actor calls returning Message objects rather than results.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/importer/test_pst_import.py` around lines 1261 - 1264,
The test is calling the actor method process_pst_file_task(...) directly which
returns a Message object instead of the actual result; change the call to invoke
the actor correctly and retrieve its return value (e.g., use the actor's
asynchronous invocation API and await the result or call the remote/invoke
method and then fetch the result) so that result holds the task output rather
than a Message; update the invocation that passes file_key and recipient_id
(mailbox.id) to use the actor's proper remote/async call pattern and await/get
the returned value before asserting.
src/backend/core/services/importer/eml_tasks.py (1)

9-10: Same logger-between-imports pattern as imap_tasks.py.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/importer/eml_tasks.py` around lines 9 - 10, The
logger definition (logger = logging.getLogger(__name__)) is placed between
import statements in eml_tasks.py which matches the problematic pattern from
imap_tasks.py; move the logging import and the logger =
logging.getLogger(__name__) line so that all import statements are grouped
together at the top and the logger is defined immediately after the import block
(i.e., ensure logging is imported with the other imports and logger is declared
after imports), matching the placement used in imap_tasks.py and avoiding
logger-between-imports ordering issues.
src/backend/core/mda/inbound_tasks.py (1)

287-289: Same decorator order question as flagged in outbound_tasks.py.

@cron_task is outer, @register_task is inner — same pattern as the outbound tasks. See the comment on outbound_tasks.py line 77-79 for details.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/mda/inbound_tasks.py` around lines 287 - 289, The decorators
on process_inbound_messages_queue_task are in the wrong order: `@cron_task` is
outer and `@register_task` is inner; swap them so `@register_task` is outer and
`@cron_task` is directly above the function (i.e., apply register_task first, then
cron_task) to match the pattern used in outbound tasks and ensure the task is
registered before the cron wrapper wraps it (see
process_inbound_messages_queue_task, cron_task, register_task and the similar
pattern in outbound_tasks.py).
🧹 Nitpick comments (16)
src/backend/core/utils.py (2)

32-50: EagerBroker doesn't store failure results via the Results middleware.

rm.after_process_message is called on the success path (line 43–44) but not in the finally/exception path. Dramatiq's after_process_message signature accepts an exception keyword argument; failing to call it on error means the Results backend never records task failures when using EagerBroker. Tests that verify failure status via message.get_result() would always see ResultMissing instead of ResultFailure.

♻️ Proposed fix
     try:
         result = actor.fn(*message.args, **message.kwargs)
         if rm:
             rm.after_process_message(self, message, result=result)
+    except Exception as exc:
+        if rm:
+            rm.after_process_message(self, message, exception=exc)
+        raise
     finally:
         if cm:
             cm.after_process_message(self, message)
             if prev is not None:
                 cm.before_process_message(self, prev)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/utils.py` around lines 32 - 50, The EagerBroker.enqueue
implementation calls Results.after_process_message only on the success path, so
failures aren't recorded; update enqueue to call rm.after_process_message in the
exception path too by catching the exception, calling
rm.after_process_message(self, message, exception=exc) before re-raising, and
keep the existing call with result=result on success; refer to the enqueue
method, the rm variable (Results middleware), and CurrentMessage handling to
ensure middleware before/after calls remain balanced when propagating the
exception.

20-21: TASK_TRACKING_CACHE_TTL hardcoded to 30 days — consider tying it to settings.

The comment notes it "matches DRAMATIQ_RESULT_BACKEND TTL", but if that setting changes, these two will silently diverge. Exposing this as a Django setting (or deriving it from the result backend configuration) would keep them in sync.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/utils.py` around lines 20 - 21, TASK_TRACKING_CACHE_TTL is
hardcoded to 30 days and can drift from the DRAMATIQ_RESULT_BACKEND TTL; change
it to read from configuration instead. Update src/backend/core/utils.py to
derive TASK_TRACKING_CACHE_TTL from a Django setting (e.g.,
settings.TASK_TRACKING_CACHE_TTL) or compute it from the DRAMATIQ_RESULT_BACKEND
TTL (fetch the backend TTL from settings or the dramatiq config) and fall back
to the existing 86400 * 30 default; keep TASK_PROGRESS_CACHE_TIMEOUT unchanged
or also expose it via settings if desired. Ensure you reference and use the
existing symbol TASK_TRACKING_CACHE_TTL in the module so callers remain
unchanged.
src/backend/core/tests/tasks/test_task_send_message.py (1)

69-71: Stale # pylint: disable=no-value-for-parameter comments.

These suppressions were needed for Celery bound tasks (where self was an explicit parameter). With Dramatiq, Actor.__call__ accepts *args, **kwargs, so pylint won't emit no-value-for-parameter here. The same suppression appears on lines 115, 154, and 176. All four can be removed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/tasks/test_task_send_message.py` around lines 69 - 71,
Remove the now-stale pylint suppression comments that disable
no-value-for-parameter for Dramatiq actors: delete the "# pylint:
disable=no-value-for-parameter" suffix from the call sites of send_message_task
in this test file (the occurrences at the call with str(draft_message.id),
must_archive=True and the other three calls referenced on lines 115, 154, and
176). Ensure the send_message_task calls remain unchanged otherwise; just remove
the pylint disable comments so the code no longer contains unnecessary
suppressions.
src/backend/core/services/importer/imap.py (1)

21-26: import logging and logger declaration are misplaced among imports.

logging is a stdlib module and must appear in the stdlib import block (before from django.conf import settings), and logger = logging.getLogger(__name__) should follow all imports, not sit between them. The project's ruff isort config (section-order = ["future", "standard-library", "django", ...]) will flag this.

♻️ Proposed fix

Move the import logging line up to be with the other stdlib imports (e.g., after import time):

 import base64
 import codecs
 import imaplib
+import logging
 import re
 import socket
 import ssl
 import time
 from typing import Any, Dict, List, Optional, Tuple
 
 from django.conf import settings
 
-import logging
-logger = logging.getLogger(__name__)
-
 from core.mda.inbound import deliver_inbound_message
 from core.mda.rfc5322 import parse_email_message
 from core.utils import set_task_progress
+
+logger = logging.getLogger(__name__)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/importer/imap.py` around lines 21 - 26, The import
order is wrong: move the "import logging" statement into the standard-library
import block (e.g., alongside other stdlib imports such as "import time") so it
appears before Django and third-party imports, and relocate "logger =
logging.getLogger(__name__)" to the bottom of the import section (after all
imports) so the module-level logger is declared only after imports are complete;
update the file's import ordering around the existing "from django.conf import
settings" and the lines referencing deliver_inbound_message,
parse_email_message, and set_task_progress accordingly.
src/backend/core/management/commands/run_task.py (1)

1-7: Stale "Celery" references throughout the file's documentation.

The module docstring, class docstring (line 19), help text (line 22), and add_arguments docstring (line 33) all still describe this as a "Celery task" runner. These should be updated to reflect the Dramatiq migration.

♻️ Suggested docstring updates
-"""
-Management command to run arbitrary Celery tasks synchronously.
-
-This command provides a Django interface to run Celery tasks with the same
-CLI flags as the main Celery CLI, but executes them synchronously instead
-of queuing them as background tasks.
-"""
+"""
+Management command to run arbitrary tasks synchronously.
+
+This command provides a Django interface to run Dramatiq tasks synchronously
+instead of queuing them as background tasks.
+"""
-class Command(BaseCommand):
-    """Run arbitrary Celery tasks synchronously."""
+class Command(BaseCommand):
+    """Run arbitrary Dramatiq tasks synchronously."""
 
     help = """
-    Run arbitrary Celery tasks synchronously.
+    Run arbitrary tasks synchronously.
-        parser.add_argument("task_name", help="Name of the Celery task to run")
+        parser.add_argument("task_name", help="Name of the task to run")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/management/commands/run_task.py` around lines 1 - 7, Update
all stale "Celery" references in this module to reflect the Dramatiq migration:
change the module-level docstring, the Command class docstring, the Command.help
text, and the add_arguments docstring (and any inline comments mentioning
Celery) to refer to Dramatiq (e.g., "Dramatiq actors/tasks" and "Dramatiq CLI
flags" as appropriate) and adjust wording where necessary so the docs accurately
describe running Dramatiq actors synchronously instead of queuing; search for
the Command class, its help attribute, and the add_arguments method to locate
and update each string.
src/backend/worker.py (1)

162-186: Validate --concurrency is positive before passing it to Dramatiq.

A user can pass 0 or a negative number; failing early yields clearer errors than letting Dramatiq fail later.

♻️ Suggested validation
-    dramatiq_args = [
+    processes = args.concurrency or 4
+    if processes < 1:
+        sys.stderr.write("Error: --concurrency must be >= 1\n")
+        sys.exit(1)
+
+    dramatiq_args = [
         "dramatiq",
         "--path", ".",
-        "--processes", str(args.concurrency or 4),
+        "--processes", str(processes),
         "--threads", "1",
         "--worker-shutdown-timeout", "600000",
     ]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/worker.py` around lines 162 - 186, Validate that args.concurrency
is a positive integer before building dramatiq_args: check args.concurrency
(used when constructing dramatiq_args and passed to dramatiq.cli.main()) and if
it's None, leave default behavior; if it's provided and <= 0, raise a clear
error (e.g., argparse error or SystemExit with a descriptive message) or coerce
to a minimum of 1. Perform this check right before creating dramatiq_args so the
invalid value is rejected early and the error message references "--concurrency"
and args.concurrency for clarity.
src/backend/core/tests/api/test_import_file_upload.py (1)

75-87: Split owner vs non-owner checks into separate tests.

This test now exercises two distinct paths; splitting keeps each test single-purpose and reduces assertion noise.

As per coding guidelines, Unit tests should focus on a single use case, keep assertions minimal, and cover all possible cases.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/api/test_import_file_upload.py` around lines 75 - 87,
Split the combined assertions into two focused tests: one that verifies
non-owner access is forbidden and another that verifies the owner can access the
task and sees the import result. Extract common setup (creating users, URL, and
result_data) into a shared fixture or test setup, then implement
test_non_owner_cannot_access where you create APIClient(),
force_authenticate(user=user2) and assert GET(url) returns
status.HTTP_403_FORBIDDEN, and implement test_owner_can_access_and_sees_result
where you create APIClient(), force_authenticate(user=user1),
mock.patch("dramatiq.Message.get_result", return_value=result_data) and assert
GET(url) returns status.HTTP_200_OK and response.data["result"]["imported"] ==
42; keep client1/client2, url, and result_data identifiers unchanged to locate
the code.
src/backend/core/tests/api/test_messages_import.py (1)

185-202: Remove leftover .status = "PENDING" mock attribute.

mock_task.return_value.status = "PENDING" is set but never asserted in test_api_import_mbox_async, test_api_import_pst_file_async, and test_api_import_pst_autodetect. This is a Celery remnant — CeleryCompatActor doesn't expose a status attribute after .delay().

♻️ Proposed cleanup (same pattern in the other two async tests)
     mock_task.return_value.id = "fake-task-id"
-    mock_task.return_value.status = "PENDING"
     response = api_client.post(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/api/test_messages_import.py` around lines 185 - 202,
Remove the leftover Celery remnant setting mock_task.return_value.status =
"PENDING" in the async import tests; locate the occurrences in
test_api_import_mbox_async, test_api_import_pst_file_async, and
test_api_import_pst_autodetect (the lines that set mock_task.return_value.status
= "PENDING") and delete them so the mocked task only sets the id and not an
unused status attribute.
src/backend/core/services/importer/imap_tasks.py (1)

6-7: Move logger initialization after all imports.

Placing logger = logging.getLogger(__name__) between import groups breaks the PEP 8 convention of grouping all imports before any module-level initializations. The same pattern appears in eml_tasks.py lines 9–10.

♻️ Proposed fix
 from typing import Any, Dict
 
-import logging
-logger = logging.getLogger(__name__)
-
+import logging
+
 from core.models import Mailbox
 from core.utils import register_task, set_task_progress
 
 from .imap import (
     ...
 )
+
+logger = logging.getLogger(__name__)

As per coding guidelines: "Follow Django/PEP 8 style with a 100-character line limit."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/importer/imap_tasks.py` around lines 6 - 7, The
module-level logger initialization is placed between import groups which
violates PEP8 import grouping; move the statement logger =
logging.getLogger(__name__) so it appears after all imports in
src/backend/core/services/importer/imap_tasks.py (and similarly adjust
eml_tasks.py where present), keeping all standard/library/third-party/local
imports grouped first and then declare the logger at module scope immediately
after the final import.
src/backend/core/tests/importer/test_file_import.py (1)

376-381: set_task_progress will emit a warning log in these tests.

test_import_message_to_different_mailbox_same_domain (and test_import_message_with_from_equal_to_mailbox_sets_is_sender at line 440) call process_eml_file_task without mocking set_task_progress. Outside a Dramatiq actor context CurrentMessage.get_current_message() returns None, causing a "set_task_progress called outside of a dramatiq actor" warning for each run. No functional failure, but it pollutes test output.

♻️ Suppress the warning in isolation tests
-    with patch("core.services.importer.eml_tasks.storages") as mock_storages:
+    with patch("core.services.importer.eml_tasks.storages") as mock_storages, \
+         patch("core.services.importer.eml_tasks.set_task_progress"):
         mock_storages.__getitem__.return_value = mock_storage
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/importer/test_file_import.py` around lines 376 - 381,
The tests call process_eml_file_task outside a Dramatiq actor so
set_task_progress emits a warning; to suppress it patch out set_task_progress in
these tests. Update the tests (e.g.,
test_import_message_to_different_mailbox_same_domain and
test_import_message_with_from_equal_to_mailbox_sets_is_sender) to wrap the call
to process_eml_file_task with a patch of
core.services.importer.eml_tasks.set_task_progress (or add a patch decorator)
that replaces it with a no-op/mock, leaving the rest of the test unchanged.
src/backend/core/mda/outbound_tasks.py (1)

162-179: Progress update fires only at batch boundaries, not on the last message.

The condition if index % batch_size == 0 means the final progress update inside the loop will occur at the last multiple of batch_size, potentially skipping the tail. Consider also updating when index == total_messages - 1 (or after the loop) so the final count is accurate.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/mda/outbound_tasks.py` around lines 162 - 179, The progress
update currently triggers only when index % batch_size == 0, which can skip the
final tail batch; modify the logic in the loop that iterates
messages_to_process.iterator (using variables index, batch_size, total_messages,
processed_count, success_count, error_count) to also call set_task_progress when
index == total_messages - 1 (or otherwise ensure a final set_task_progress after
the loop) so the final progress reflects all processed messages and updates the
"processed_messages"/counts for the last partial batch.
src/backend/core/services/exporter/tasks.py (3)

437-605: No set_task_progress(100, ...) on completion.

Progress reporting reaches 95 at line 548 (notification step) but never hits 100 on success. If any UI polls progress to determine completion, it will remain at 95 even after the task returns. Consider adding a final set_task_progress(100, ...) before the success return on line 586.

Proposed addition before the return
         return {"status": "SUCCESS", "result": result, "error": None}

+        set_task_progress(100, {"result": result, "error": None})
         return {"status": "SUCCESS", "result": result, "error": None}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/exporter/tasks.py` around lines 437 - 605, The
progress never reaches 100% on successful export; before the success return
(after generate_presigned_url and _create_notification_message) call
set_task_progress(100, {...}) with the final result payload (message_status
"Export completed", total_messages, exported_count, skipped_count, s3_key) so
the UI sees completion; update the same result dict you return and invoke
set_task_progress(100, {"result": result, "error": None}) just prior to
returning {"status": "SUCCESS", "result": result, "error": None}.

402-403: user_id parameter is unused in the function body.

The # pylint: disable=unused-argument suppresses the warning, but user_id is never referenced inside export_mailbox_task. If ownership tracking now happens at the call site via task.track_owner(user_id), consider documenting why the parameter is still present (e.g., for result-backend metadata or future use), or remove it if callers have been updated.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/exporter/tasks.py` around lines 402 - 403, The
export_mailbox_task function has an unused user_id parameter
(export_mailbox_task) suppressed by pylint; either remove the parameter from the
signature and update callers, or explicitly use it for metadata/ownership (e.g.,
pass into task.track_owner(user_id) or include in result-backend/task metadata)
and add a short docstring comment explaining why it is needed; update any
callers and tests accordingly so the signature and usage remain consistent.

15-16: Import ordering: logging placed between Django and local imports.

import logging (stdlib) should precede all third-party/Django imports per PEP 8 / isort convention.

Suggested reordering
+import logging
 import gzip
 import html
 import io
 import re
+
 from datetime import datetime, timezone
 from email.message import EmailMessage
 from email.utils import format_datetime
 from typing import Any, Dict
 
 from django.conf import settings
 from django.core.files.storage import storages
 
-import logging
-logger = logging.getLogger(__name__)
-
 from core.api.utils import generate_presigned_url
 ...
+
+logger = logging.getLogger(__name__)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/exporter/tasks.py` around lines 15 - 16, Move the
standard-library import `import logging` to the top of the import block so it
appears before any third-party/Django and local imports (PEP8/isort order), then
keep the existing `logger = logging.getLogger(__name__)` line unchanged; this
affects the module-level imports in the file and ensures `logging` is declared
before Django imports like any other stdlib module.
src/backend/core/mda/inbound_tasks.py (2)

318-329: .delay() vs .send() — docstring says .send() but code uses .delay().

Line 293 documents the call as process_inbound_message_task.send() but line 321 uses .delay(). Presumably CeleryCompatActor aliases both, but the inconsistency could confuse future contributors. Consider aligning one way.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/mda/inbound_tasks.py` around lines 318 - 329, The loop uses
process_inbound_message_task.delay(...) while the docstring (and
CeleryCompatActor abstraction) references .send(...), creating an inconsistency;
standardize to a single API by replacing the .delay calls in the old_messages
retry loop with the documented .send(...) (or alternatively change the docstring
to document .delay if that is preferred) and ensure process_inbound_message_task
(CeleryCompatActor) supports the chosen alias so callers and docs match (update
the loop that iterates old_messages and the docstring referencing
process_inbound_message_task accordingly).

5-19: Import ordering: logging and requests are placed after Django imports.

Standard Python convention (and isort defaults) place stdlib imports (logging) before third-party imports (django.*, requests), and third-party imports before local imports (core.*). Currently logging (line 11) and requests (line 16) are interleaved with local imports.

Suggested reordering
 import re
+import logging
 from typing import Any, Dict, Optional, Tuple
 
 from django.core.cache import cache
 from django.utils import timezone
 
-import logging
+import requests
 
 from core import models
 from core.mda.inbound_create import _create_message_from_inbound
 from core.mda.rfc5322 import parse_email_message
-import requests
 from core.utils import cron_task, register_task
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/mda/inbound_tasks.py` around lines 5 - 19, The import
ordering mixes stdlib and third-party/local imports; move the stdlib import
"logging" to the top group, keep third-party Django imports
("django.core.cache", "django.utils") in the middle group, then third-party
"requests" after Django, and finally local imports from "core" (e.g., models,
_create_message_from_inbound, parse_email_message, cron_task, register_task);
adjust the import blocks around the module-level symbols logger and any use of
requests so imports follow the stdlib → third-party → local convention.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 516d9df and 4d5be92.

⛔ Files ignored due to path filters (1)
  • src/backend/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (38)
  • Makefile
  • compose.yaml
  • src/backend/core/admin.py
  • src/backend/core/api/viewsets/send.py
  • src/backend/core/api/viewsets/task.py
  • src/backend/core/management/commands/run_task.py
  • src/backend/core/mda/inbound_tasks.py
  • src/backend/core/mda/outbound.py
  • src/backend/core/mda/outbound_tasks.py
  • src/backend/core/services/exporter/tasks.py
  • src/backend/core/services/importer/eml_tasks.py
  • src/backend/core/services/importer/imap.py
  • src/backend/core/services/importer/imap_tasks.py
  • src/backend/core/services/importer/mbox_tasks.py
  • src/backend/core/services/importer/pst_tasks.py
  • src/backend/core/services/importer/service.py
  • src/backend/core/services/search/tasks.py
  • src/backend/core/tasks.py
  • src/backend/core/tests/api/test_draft_attachments.py
  • src/backend/core/tests/api/test_import_file_upload.py
  • src/backend/core/tests/api/test_messages_create.py
  • src/backend/core/tests/api/test_messages_import.py
  • src/backend/core/tests/exporter/test_export_task.py
  • src/backend/core/tests/importer/test_file_import.py
  • src/backend/core/tests/importer/test_imap_import.py
  • src/backend/core/tests/importer/test_import_service.py
  • src/backend/core/tests/importer/test_pst_import.py
  • src/backend/core/tests/mda/test_retry.py
  • src/backend/core/tests/mda/test_spam_processing.py
  • src/backend/core/tests/tasks/test_task_importer.py
  • src/backend/core/tests/tasks/test_task_send_message.py
  • src/backend/core/tests/test_worker.py
  • src/backend/core/utils.py
  • src/backend/messages/__init__.py
  • src/backend/messages/celery_app.py
  • src/backend/messages/settings.py
  • src/backend/pyproject.toml
  • src/backend/worker.py
💤 Files with no reviewable changes (1)
  • src/backend/messages/celery_app.py

Comment on lines +90 to +99
try:
result_data = message.get_result(block=False)
except ResultMissing:
result_data = None
except ResultFailure as exc:
return Response({
"status": "FAILURE",
"result": None,
"error": str(exc),
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# Find the project's dependency file to check Dramatiq version
fd -e "requirements.txt" -e "pyproject.toml" -e "poetry.lock" -e "Pipfile" | head -20

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Check the actual code at the specified location
cat -n src/backend/core/api/viewsets/task.py | sed -n '85,105p'

Repository: suitenumerique/messages

Length of output: 910


🏁 Script executed:

# Search for any existing Sentry imports in the backend
rg "from sentry_sdk import|import sentry_sdk" src/backend/ -A 2

Repository: suitenumerique/messages

Length of output: 1588


🏁 Script executed:

# Check if there's a logger already configured in this file
rg "import logging|logger = " src/backend/core/api/viewsets/task.py -A 2

Repository: suitenumerique/messages

Length of output: 140


🏁 Script executed:

# Find dependency files
find . -name "requirements*.txt" -o -name "pyproject.toml" -o -name "setup.py" | grep -v __pycache__ | head -10

Repository: suitenumerique/messages

Length of output: 226


🏁 Script executed:

# Check git history or setup files for Dramatiq version
rg "dramatiq" . --max-count=5 -A 2 | head -30

Repository: suitenumerique/messages

Length of output: 2047


🌐 Web query:

Dramatiq 2.0.1 ResultFailure exception __str__ what does it return

💡 Result:

In Dramatiq 2.0.1, ResultFailure does not override __str__; it inherits it from dramatiq.errors.DramatiqError. That __str__ returns:

  • str(self.message) if it’s truthy, otherwise
  • repr(self.message) (fallback for falsy / empty messages). [1]

ResultFailure.__init__ just stores the message plus orig_exc_type / orig_exc_msg; those extra fields do not affect __str__ directly. [2]

So str(e) where e is a ResultFailure is essentially whatever message Dramatiq constructed when raising it (commonly shown as "actor raised <ExceptionType>: <ExceptionMessage>"). [3]

Sources:
[1] DramatiqError __str__ implementation https://dramatiq.io/_modules/dramatiq/errors.html
[2] ResultFailure definition https://dramatiq.io/_modules/dramatiq/results/errors.html
[3] Docs example showing the message format https://dramatiq.io/reference.html


🏁 Script executed:

# Check for similar exception handling patterns in the codebase
rg "ResultFailure|ResultMissing" src/backend/ -B 3 -A 5

Repository: suitenumerique/messages

Length of output: 1355


🏁 Script executed:

# Look for other places where exceptions are returned to clients in viewsets
rg "return Response.*str\(exc\)" src/backend/ -B 2 -A 2

Repository: suitenumerique/messages

Length of output: 49


🏁 Script executed:

# Check the imports in the task viewset file
cat -n src/backend/core/api/viewsets/task.py | head -30

Repository: suitenumerique/messages

Length of output: 991


Capture exceptions internally and return a generic error message to clients.

str(exc) exposes the Dramatiq task failure message (formatted as "actor raised <ExceptionType>: <ExceptionMessage>") which can leak internal exception details to clients. Log and capture the exception, then return a generic error response.

Add the import and update the exception handler:

Fix
import dramatiq
+from sentry_sdk import capture_exception
        except ResultFailure as exc:
+            logger.exception("Task %s failed", task_id)
+            capture_exception(exc)
             return Response({
                 "status": "FAILURE",
                 "result": None,
-                "error": str(exc),
+                "error": "Task execution failed.",
             })
🧰 Tools
🪛 GitHub Check: CodeQL

[warning] 95-99: Information exposure through an exception
Stack trace information flows to this location and may be exposed to an external user.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/api/viewsets/task.py` around lines 90 - 99, The
ResultFailure handler currently returns str(exc) which can leak internal
exception details; update the exception handling around message.get_result (the
except ResultFailure as exc block) to log the full exception internally (e.g.,
logger.exception or process_logger.exception after importing the logger) and
return a generic client-safe Response payload (e.g., {"status": "FAILURE",
"result": None, "error": "Task failed"}). Add the necessary logger import and
replace the returned error string with the generic message while ensuring the
original exception is captured in the internal log call.

Comment on lines +113 to +124
# Check if we have progress data for this task
progress_data = get_task_progress(task_id)
if progress_data:
return Response(
{
"status": "PROGRESS",
"result": None,
"error": None,
"progress": progress_data.get("progress"),
"message": progress_data.get("metadata", {}).get("message"),
"timestamp": progress_data.get("timestamp"),
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Expose progress metadata in PROGRESS responses.

set_task_progress stores metadata (e.g., mbox tasks put counts under metadata["result"]), but the API currently drops it and returns message=None. This hides meaningful progress details from clients.

🔧 Proposed fix
         progress_data = get_task_progress(task_id)
         if progress_data:
+            metadata = progress_data.get("metadata", {}) or {}
             return Response(
                 {
                     "status": "PROGRESS",
-                    "result": None,
-                    "error": None,
+                    "result": metadata or None,
+                    "error": metadata.get("error"),
                     "progress": progress_data.get("progress"),
-                    "message": progress_data.get("metadata", {}).get("message"),
+                    "message": metadata.get("message")
+                    or metadata.get("result", {}).get("message_status"),
                     "timestamp": progress_data.get("timestamp"),
                 }
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/api/viewsets/task.py` around lines 113 - 124, The PROGRESS
response drops useful metadata set by set_task_progress; update the block that
uses get_task_progress (in the viewset handling task status) to include the
metadata payload in the Response: return the metadata object (e.g.,
progress_data.get("metadata")) and surface any nested fields like
metadata.get("result") and metadata.get("message") instead of returning
message=None; keep existing keys (status, result, error, progress, timestamp)
but populate result/message from progress_data["metadata"] when present so
clients receive the stored progress details.

from core.models import Mailbox

from messages.celery_app import app as celery_app
from core.utils import register_task, set_task_progress
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, let's find all occurrences of set_task_progress in the codebase
rg -n "set_task_progress" --type=py

Repository: suitenumerique/messages

Length of output: 4379


🏁 Script executed:

# Now let's examine the imap_tasks.py file to see what functions are defined and used
cat -n src/backend/core/services/importer/imap_tasks.py

Repository: suitenumerique/messages

Length of output: 6364


🏁 Script executed:

# Let's also look at the other importer task files mentioned for comparison
ls -la src/backend/core/services/importer/

Repository: suitenumerique/messages

Length of output: 798


Remove the unused set_task_progress import from line 10.

set_task_progress is imported in imap_tasks.py but is never called anywhere in this file. Unlike other importer tasks (eml_tasks.py, mbox_tasks.py, pst_tasks.py), which call it directly, the IMAP importer delegates progress reporting to process_folder_messages in imap.py, which imports set_task_progress independently. Remove the unused import to clean up dead code.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/services/importer/imap_tasks.py` at line 10, Remove the
unused import set_task_progress from the top-level import in imap_tasks.py: keep
register_task but delete set_task_progress since progress reporting for IMAP is
handled by process_folder_messages in imap.py which imports set_task_progress
itself; update the import line that currently references both register_task and
set_task_progress to only import register_task.

Comment on lines +136 to 139
# Create a draft message (is_sender=True since this is an outbound draft)
draft = factories.MessageFactory(
thread=thread, sender=sender, is_draft=True, subject="Existing draft"
thread=thread, sender=sender, is_draft=True, is_sender=True, subject="Existing draft"
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

rg -n --type=py 'is_sender' --glob '!**/tests/**' src/backend/

Repository: suitenumerique/messages

Length of output: 4778


🏁 Script executed:

# Check the send-message view/endpoint to see if it validates or checks is_sender
fd -t f '\.py$' src/backend/ --glob '!**/tests/**' | xargs rg -l 'send.*message|send.*draft' | head -10

Repository: suitenumerique/messages

Length of output: 659


🏁 Script executed:

# Examine the test at line 534 and surrounding context
sed -n '530,545p' src/backend/core/tests/api/test_draft_attachments.py

Repository: suitenumerique/messages

Length of output: 665


🏁 Script executed:

# Check the send-message endpoint implementation
sed -n '150,180p' src/backend/core/api/viewsets/message.py

Repository: suitenumerique/messages

Length of output: 1384


🏁 Script executed:

# Check the serializer validation around line 842
sed -n '830,860p' src/backend/core/api/serializers.py

Repository: suitenumerique/messages

Length of output: 1157


🏁 Script executed:

# Check the full test method at line 534 to see what it expects
sed -n '530,570p' src/backend/core/tests/api/test_draft_attachments.py

Repository: suitenumerique/messages

Length of output: 1807


🏁 Script executed:

# Check draft.py to see how drafts are created with is_sender
sed -n '380,395p' src/backend/core/mda/draft.py

Repository: suitenumerique/messages

Length of output: 483


🏁 Script executed:

# Find the send-message endpoint implementation
rg -n 'send-message|send_message' src/backend/core/api/ --type=py

Repository: suitenumerique/messages

Length of output: 411


🏁 Script executed:

# Look for the send endpoint in viewsets
rg -n 'def send' src/backend/core/api/viewsets/ --type=py -A 3

Repository: suitenumerique/messages

Length of output: 449


🏁 Script executed:

# Check if there's a send serializer
fd -t f -name '*.py' src/backend/core/api/ | xargs rg -l 'send.*serializer' -i

Repository: suitenumerique/messages

Length of output: 476


🏁 Script executed:

# Examine the send viewset implementation
cat -n src/backend/core/api/viewsets/send.py

Repository: suitenumerique/messages

Length of output: 5695


🏁 Script executed:

# Check send_message_task in outbound_tasks
sed -n '110,150p' src/backend/core/mda/outbound_tasks.py

Repository: suitenumerique/messages

Length of output: 1287


🏁 Script executed:

# Check prepare_outbound_message function to see if it validates is_sender
rg -n 'def prepare_outbound_message' src/backend/core/mda/outbound.py -A 30

Repository: suitenumerique/messages

Length of output: 1297


🏁 Script executed:

# Also check the test expectations more carefully - what does the test expect when sending a draft without is_sender?
sed -n '520,570p' src/backend/core/tests/api/test_draft_attachments.py

Repository: suitenumerique/messages

Length of output: 2166


Align is_sender=True on outbound draft at line 534 with the change at line 138

The is_sender=True addition at line 138 is correct and establishes the pattern that outbound drafts should be marked as sender-side. However, test_draft_send_with_attachments_exceeding_size_limit at line 534 creates an identically structured outbound draft without it:

draft = factories.MessageFactory(
    thread=thread, sender=sender, is_draft=True, subject="Test draft"  # is_sender=True missing
)

While the test passes (the size-limit check fires before any is_sender consideration), the draft is semantically inconsistent with the outbound-draft convention. For consistency, add is_sender=True to line 534 to match line 138.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/core/tests/api/test_draft_attachments.py` around lines 136 - 139,
In the test_draft_send_with_attachments_exceeding_size_limit test in
src/backend/core/tests/api/test_draft_attachments.py, update the MessageFactory
call that creates the outbound draft to include is_sender=True so it matches the
outbound-draft convention used elsewhere (e.g., the earlier draft creation);
specifically, add is_sender=True to the factories.MessageFactory(...) invocation
inside test_draft_send_with_attachments_exceeding_size_limit.

Comment thread src/backend/worker.py
Comment on lines +23 to +31
import multiprocessing
import os
import sys

# Workaround for Dramatiq + Python 3.14: forkserver (the new default) breaks
# Dramatiq's Canteen shared-memory mechanism, causing worker processes to never
# consume messages. See https://github.com/Bogdanp/dramatiq/issues/701
# Must be set before dramatiq.cli.main() spawns worker processes.
multiprocessing.set_start_method("fork", force=True)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard the fork-only start method for unsupported platforms.

set_start_method("fork") raises on platforms without fork (e.g., Windows). A small guard avoids hard failures outside Linux environments.

🛠️ Suggested guard
-multiprocessing.set_start_method("fork", force=True)
+if "fork" in multiprocessing.get_all_start_methods():
+    multiprocessing.set_start_method("fork", force=True)
+else:
+    logging.getLogger(__name__).warning(
+        "Multiprocessing start method 'fork' is unavailable; using default."
+    )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import multiprocessing
import os
import sys
# Workaround for Dramatiq + Python 3.14: forkserver (the new default) breaks
# Dramatiq's Canteen shared-memory mechanism, causing worker processes to never
# consume messages. See https://github.com/Bogdanp/dramatiq/issues/701
# Must be set before dramatiq.cli.main() spawns worker processes.
multiprocessing.set_start_method("fork", force=True)
import logging
import multiprocessing
import os
import sys
# Workaround for Dramatiq + Python 3.14: forkserver (the new default) breaks
# Dramatiq's Canteen shared-memory mechanism, causing worker processes to never
# consume messages. See https://github.com/Bogdanp/dramatiq/issues/701
# Must be set before dramatiq.cli.main() spawns worker processes.
if "fork" in multiprocessing.get_all_start_methods():
multiprocessing.set_start_method("fork", force=True)
else:
logging.getLogger(__name__).warning(
"Multiprocessing start method 'fork' is unavailable; using default."
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/backend/worker.py` around lines 23 - 31, The unconditional call to
multiprocessing.set_start_method("fork", force=True) will raise on platforms
without fork (e.g., Windows); wrap or gate this call so it only runs on
platforms that support fork (e.g., check os.name == "posix" or sys.platform
startswith "linux"/"darwin") or catch and ignore the specific exceptions
(RuntimeError/AttributeError) raised when the start method cannot be changed,
then leave a brief comment referencing Dramatiq's fork requirement; update the
module-level call to use that platform guard around
multiprocessing.set_start_method so the worker process startup doesn't hard-fail
on non-fork systems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants