Conversation
📝 WalkthroughWalkthroughThis pull request executes a comprehensive migration from Celery to Dramatiq as the task queue framework. The changes involve replacing Celery decorators and patterns with new Dramatiq-based utilities (register_task, cron_task), migrating progress tracking to a cache-based mechanism, updating task ownership tracking via instance methods, removing legacy Celery configuration and dependencies, and refactoring the worker CLI and test infrastructure throughout the application. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Tip Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| return Response({ | ||
| "status": "FAILURE", | ||
| "result": None, | ||
| "error": str(exc), | ||
| }) |
Check warning
Code scanning / CodeQL
Information exposure through an exception Medium
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 3 months ago
In general, the fix is to avoid returning raw exception messages (or stack traces) directly to the client. Instead, log the detailed exception server-side and send the client a generic, non-sensitive error message or code while preserving the existing API contract where possible.
For this specific code, the best minimal fix is: in the except ResultFailure as exc: block, log the exception (including the stack trace) using the existing logger, and change the "error" field in the response to a generic message such as "Task failed" or "An internal error has occurred.". This keeps the response structure (status, result, error) and HTTP status code unchanged, so existing clients still receive "status": "FAILURE" and an error string, but without leaking internal exception details.
Concretely, in src/backend/core/api/viewsets/task.py, in the TaskDetailView.get method, update the except ResultFailure as exc: clause starting at line 94. Add a logger.exception(...) call before returning the response, and change "error": str(exc) to a generic error string. No new imports are needed because logging and logger are already defined at the top of the file.
| @@ -92,10 +92,11 @@ | ||
| except ResultMissing: | ||
| result_data = None | ||
| except ResultFailure as exc: | ||
| logger.exception("Task %s failed when fetching result.", task_id) | ||
| return Response({ | ||
| "status": "FAILURE", | ||
| "result": None, | ||
| "error": str(exc), | ||
| "error": "Task failed.", | ||
| }) | ||
|
|
||
| if result_data is not None: |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (11)
src/backend/core/tests/api/test_messages_create.py (1)
1029-1047:⚠️ Potential issue | 🟡 MinorConsider adding
is_sender=Truetomessageon line 1029 to clarify the test's intent.
message2(the success path) now requiresis_sender=Truebecause the send endpoint reaches the asynchronoussend_message_task, which validates this flag.message(the EDITOR-denial path) currently lacks this flag, but the 403 response comes from the synchronous role check inIsAllowedToAccess.has_object_permission(), which runs before anyis_sendervalidation.Adding
is_sender=Truetomessageensures both test phases have identical message state, making it unambiguous that only the mailbox role differs. This aligns with test clarity best practices.Proposed fix
message = factories.MessageFactory( thread=thread_access.thread, is_draft=True, + is_sender=True, sender=factories.ContactFactory(mailbox=mailbox), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/api/test_messages_create.py` around lines 1029 - 1047, The test's denial path uses a MessageFactory-created message without is_sender=True which makes the 403 come from IsAllowedToAccess.has_object_permission rather than matching the async send_message_task validation; update the first message creation (the variable named message created via factories.MessageFactory) to include is_sender=True so both the denial path and the success path share the same message state and only the mailbox/role differs, ensuring clarity in the test intent and coverage.src/backend/pyproject.toml (1)
80-95:⚠️ Potential issue | 🟡 Minor
flowerdev-dependency is Celery-specific and should be removed.
flower==2.0.1is a web UI for Celery task monitoring and has no functionality outside a Celery stack. Withceleryremoved as a production dependency,floweris a dead dependency that will fail if invoked.🛠️ Proposed fix
dev = [ "django-extensions==4.1", "drf-spectacular-sidecar==2026.1.1", - "flower==2.0.1", "hypothesis==6.151.9",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/pyproject.toml` around lines 80 - 95, Remove the dead Celery-specific dev-dependency by deleting the "flower==2.0.1" entry from the dev dependencies list in pyproject.toml; search for the exact token flower==2.0.1 in the file and remove that line (and trailing comma adjustment if necessary) so the remaining list remains valid TOML/JSON array syntax.compose.yaml (1)
169-188:⚠️ Potential issue | 🟠 Major
worker-uiservice is broken — still references removed Celery/Flower stack.Line 188 runs
celery -A messages.celery_app flower --port=8803, but Celery was removed as a dependency andmessages.celery_appwas deleted. This service will fail to start.Either remove this service entirely or replace it with a Dramatiq-compatible monitoring solution (e.g.,
django-dramatiqadmin UI or a custom status endpoint).🔧 Minimal fix — remove the broken service
- worker-ui: - build: - context: src/backend - target: runtime-dev - args: - DOCKER_USER: ${DOCKER_USER:-1000} - user: ${DOCKER_USER:-1000} - depends_on: - - redis - environment: - - FLOWER_UNAUTHENTICATED_API=true - - DJANGO_CONFIGURATION=Development - env_file: - - env.d/development/backend.defaults - - env.d/development/backend.local - volumes: - - ./src/backend:/app - ports: - - "8903:8803" - command: celery -A messages.celery_app flower --port=8803🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@compose.yaml` around lines 169 - 188, The worker-ui service is broken because it still runs the removed Celery/Flower command `celery -A messages.celery_app flower --port=8803`; remove or replace this service: either delete the `worker-ui` service block entirely, or update it to a Dramatiq-compatible monitor (e.g., run a dramatiq admin UI or a custom status endpoint) and remove the Celery-specific env/command references (`FLOWER_UNAUTHENTICATED_API`, `celery -A messages.celery_app flower --port=8803`) and adjust volumes/ports accordingly so the compose file only references existing Dramatiq components.src/backend/core/services/search/tasks.py (1)
115-153:⚠️ Potential issue | 🟠 Major
threads_vieweris a method; call it to get a queryset.Without
(),threads_qsis a bound method and.count()will fail.🐛 Fix method invocation
- threads_qs = models.Mailbox.objects.get(id=mailbox_id).threads_viewer + threads_qs = models.Mailbox.objects.get(id=mailbox_id).threads_viewer()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/search/tasks.py` around lines 115 - 153, threads_viewer is a method on models.Mailbox but the code assigns the bound method to threads_qs, so .count()/.iterator() will fail; update the assignment to call the method (use models.Mailbox.objects.get(id=mailbox_id).threads_viewer()) so threads_qs is a queryset, leaving the subsequent .count(), .iterator(), index_thread() loop, and set_task_progress() logic unchanged.src/backend/core/services/importer/pst_tasks.py (1)
132-143:⚠️ Potential issue | 🟡 MinorWrap the progress calculation to meet the 100‑char line limit.
The inline conditional is over the 100‑char limit and is harder to read.
As per coding guidelines, Follow Django/PEP 8 style with a 100-character line limit.🧹 Suggested formatting
- progress_pct = min(int((current_message / total_messages) * 100), 99) if total_messages > 0 else 0 + if total_messages > 0: + progress_pct = min( + int(current_message / total_messages * 100), 99 + ) + else: + progress_pct = 0🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/importer/pst_tasks.py` around lines 132 - 143, The inline conditional computing progress_pct is too long; split the expression across multiple lines or compute it in two steps to respect the 100-char limit. For example, assign a safe_divisor check or intermediate ratio variable using current_message and total_messages, then compute progress_pct = min(int(ratio * 100), 99) if total_messages > 0 else 0, and keep the set_task_progress call intact; update the code around the progress_pct assignment in pst_tasks.py (symbols: progress_pct, current_message, total_messages, set_task_progress) to use the wrapped/stepwise form.src/backend/core/tests/importer/test_import_service.py (1)
159-171:⚠️ Potential issue | 🟡 MinorIneffective mock — patch target doesn't intercept calls from
eml_tasks.py.
eml_tasks.pyusesfrom core.mda.inbound import deliver_inbound_message, creating a local binding in that module. Patchingcore.mda.inbound.deliver_inbound_messageonly replaces the name incore.mda.inbound, not the already-bound reference ineml_tasks. As a resultmock_deliveris never called — the realdeliver_inbound_messageruns, and the test passes incidentally becauseis_import=Truecauses direct message creation anyway.The same bug is present in
test_import_file_eml_by_user_with_access_sync(line 238).🐛 Proposed fix
- with patch("core.mda.inbound.deliver_inbound_message", side_effect=mock_deliver): + with patch("core.services.importer.eml_tasks.deliver_inbound_message", side_effect=mock_deliver):Apply the same correction at line 238.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/importer/test_import_service.py` around lines 159 - 171, The test tries to mock deliver_inbound_message but patches core.mda.inbound; because eml_tasks.py does "from core.mda.inbound import deliver_inbound_message" you must patch the symbol where it's used instead: update the patch target to core.mda.eml_tasks.deliver_inbound_message (for both the case around process_eml_file_task at the current block and the other test at the second occurrence in test_import_file_eml_by_user_with_access_sync) so that mock_deliver intercepts the call made by process_eml_file_task and returns True as intended.src/backend/core/tests/importer/test_imap_import.py (1)
176-223:⚠️ Potential issue | 🟡 MinorPatch target likely incorrect for
set_task_progressmock.
import_imap_messages_taskis defined incore.services.importer.imap_tasks, so patchingcore.services.importer.imap.set_task_progresswon’t intercept the calls and the mock assertions will fail. Update both occurrences to target the task module.🔧 Proposed fix
- with patch("core.services.importer.imap.set_task_progress") as mock_set_progress: + with patch("core.services.importer.imap_tasks.set_task_progress") as mock_set_progress: ... - with patch("core.services.importer.imap.set_task_progress") as mock_set_progress: + with patch("core.services.importer.imap_tasks.set_task_progress") as mock_set_progress:Also applies to: 281-339
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/importer/test_imap_import.py` around lines 176 - 223, The test is patching set_task_progress in the wrong module; replace patch targets that currently use "core.services.importer.imap.set_task_progress" with the task module "core.services.importer.imap_tasks.set_task_progress" so the mock intercepts calls from import_imap_messages_task; update both places in this test file where set_task_progress is patched (the block around the successful import assertions and the other similar block later) to reference core.services.importer.imap_tasks.set_task_progress.src/backend/core/mda/inbound_tasks.py (2)
274-281:⚠️ Potential issue | 🟡 MinorMissing Sentry reporting for unhandled exceptions.
The broad
except Exceptionat line 274 logs and saves the error but does not callcapture_exception(e). This is inconsistent with the project guideline requiring Sentry reporting.Proposed fix
+from sentry_sdk import capture_exception ... except Exception as e: + capture_exception(e) logger.exception( "Error processing inbound message %s: %s", inbound_message_id, e )Based on learnings: "Capture and report exceptions to Sentry; use capture_exception() for custom errors."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/mda/inbound_tasks.py` around lines 274 - 281, The exception handler in inbound_tasks.py (the except Exception block that logs the error, updates inbound_message.error_message, and returns the failure dict) is missing Sentry reporting; add a call to capture_exception(e) (imported from sentry_sdk or the project's Sentry wrapper) inside this except block—preferably immediately after logger.exception and before updating inbound_message/save and returning—to ensure all unhandled exceptions in the function (where inbound_message is referenced) are reported to Sentry.
302-302:⚠️ Potential issue | 🔴 CriticalImport
timedeltafromdatetimemodule —timezone.timedeltadoes not exist.Line 302 uses
timezone.timedelta(minutes=5), butdjango.utils.timezonedoes not exposetimedelta. This will raiseAttributeErrorat runtime when the cron task fires. Importtimedeltafromdatetimeand use it directly.Fix
import re from typing import Any, Dict, Optional, Tuple +from datetime import timedelta from django.core.cache import cache from django.utils import timezone- retry_threshold = timezone.now() - timezone.timedelta(minutes=5) + retry_threshold = timezone.now() - timedelta(minutes=5)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/mda/inbound_tasks.py` at line 302, The code sets retry_threshold using timezone.timedelta which doesn't exist; import timedelta from datetime and change the assignment in inbound_tasks.py so retry_threshold = timezone.now() - timedelta(minutes=5) (keep the existing timezone.now() usage and update the import to include from datetime import timedelta) to avoid the AttributeError at runtime.src/backend/core/mda/outbound_tasks.py (1)
21-74:⚠️ Potential issue | 🟡 MinorMissing Sentry reporting in the archive exception handler.
Line 59 catches a broad exception during archiving but only logs it. Per project guidelines, exceptions should also be reported to Sentry via
capture_exception(). Also, ifmessage.threadisNone, theexceptblock at line 63 will itself raise anAttributeErrorwhen accessingthread.id, masking the original error.Proposed fix
+from sentry_sdk import capture_exception ... try: thread = message.thread models.Message.objects.filter(thread=thread).update( is_archived=True, archived_at=timezone.now() ) thread.update_stats() archived = True set_task_progress(90, {"message": "Thread archived"}) except Exception as e: # Not critical, just log the error + capture_exception(e) logger.exception( "Error in send_message_task when archiving thread %s after sending message %s: %s", - thread.id, + getattr(thread, "id", None), message_id, e, )Based on learnings: "Capture and report exceptions to Sentry; use capture_exception() for custom errors."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/mda/outbound_tasks.py` around lines 21 - 74, In send_message_task, the except block that handles archiving (around the thread variable) only logs the error and then references thread.id which can raise a new AttributeError; update this handler to call capture_exception(e) to report to Sentry, avoid dereferencing thread if it may be None by using a safe identifier (e.g., getattr(thread, "id", None) or message.thread_id) when building the log message, and keep the logger.exception call but include the safe id value; ensure the change is applied in the except block that currently references thread.id and logger.exception so original exceptions are reported and not masked.src/backend/core/services/exporter/tasks.py (1)
439-450:⚠️ Potential issue | 🟠 MajorProgress metadata structure in exporter tasks uses nested
resultdict, breaking API consumer expectations.The task progress API endpoint (
task.pyline 120) extracts the progress message viametadata.get("message"), expecting a flat metadata structure. However, the exporter code at lines 439–450 (and elsewhere in mbox/eml tasks) nests metadata as{"result": {...}, "error": None}, causing the API to return"message": Noneto the frontend. This differs from flat patterns inoutbound_tasks.pyandsearch/tasks.py, which use{"message": "..."}. The inconsistency causes progress messages to disappear for exporter/importer tasks. Align all task progress metadata to a single structure—either flatten exporter metadata to{"message": "...", "result": {...}, ...}or update the API consumer to handle both patterns.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/exporter/tasks.py` around lines 439 - 450, The exporter tasks are nesting progress under "result" which breaks the API consumer expecting top-level "message"; update all set_task_progress calls in exporter/mbox/eml task code (the call sites using set_task_progress that currently pass {"result": {...}, "error": None}) to include a top-level "message" (and other top-level fields like "error" if needed) while keeping the detailed stats under "result" (e.g. {"message": "Counting messages", "result": {...}, "error": None}); locate usages by searching for set_task_progress in exporter tasks and adjust the metadata shape to match the flat pattern used by outbound_tasks.py and search/tasks.py so task.py can read metadata.get("message") correctly.
♻️ Duplicate comments (5)
src/backend/core/tests/importer/test_pst_import.py (3)
1218-1221: Same concern as above regarding direct actor calls returning Message objects rather than results.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/importer/test_pst_import.py` around lines 1218 - 1221, The test is directly invoking process_pst_file_task and getting an actor Message instead of the actual result; update the call to await the actor's response (e.g., use the actor's ask/ask_nowait or .ask(...).get() / await .ask(...) pattern your actor framework provides) so result contains the task return value rather than a Message object; modify the invocation of process_pst_file_task to use the actor await API and adjust assertions to use that awaited value.
1180-1183: Same concern as above regarding direct actor calls returning Message objects rather than results.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/importer/test_pst_import.py` around lines 1180 - 1183, The test invokes the actor via process_pst_file_task directly which returns a Message object instead of the actual result; update the test to unwrap the actor response (for example by awaiting or calling the Message's result/get method or using the actor ask API) so you assert on the real return value rather than the Message placeholder — specifically change the call site around process_pst_file_task(file_key=file_key, recipient_id=str(mailbox.id)) to retrieve the underlying result before making assertions.
1261-1264: Same concern as above regarding direct actor calls returning Message objects rather than results.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/importer/test_pst_import.py` around lines 1261 - 1264, The test is calling the actor method process_pst_file_task(...) directly which returns a Message object instead of the actual result; change the call to invoke the actor correctly and retrieve its return value (e.g., use the actor's asynchronous invocation API and await the result or call the remote/invoke method and then fetch the result) so that result holds the task output rather than a Message; update the invocation that passes file_key and recipient_id (mailbox.id) to use the actor's proper remote/async call pattern and await/get the returned value before asserting.src/backend/core/services/importer/eml_tasks.py (1)
9-10: Same logger-between-imports pattern asimap_tasks.py.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/importer/eml_tasks.py` around lines 9 - 10, The logger definition (logger = logging.getLogger(__name__)) is placed between import statements in eml_tasks.py which matches the problematic pattern from imap_tasks.py; move the logging import and the logger = logging.getLogger(__name__) line so that all import statements are grouped together at the top and the logger is defined immediately after the import block (i.e., ensure logging is imported with the other imports and logger is declared after imports), matching the placement used in imap_tasks.py and avoiding logger-between-imports ordering issues.src/backend/core/mda/inbound_tasks.py (1)
287-289: Same decorator order question as flagged inoutbound_tasks.py.
@cron_taskis outer,@register_taskis inner — same pattern as the outbound tasks. See the comment onoutbound_tasks.pyline 77-79 for details.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/mda/inbound_tasks.py` around lines 287 - 289, The decorators on process_inbound_messages_queue_task are in the wrong order: `@cron_task` is outer and `@register_task` is inner; swap them so `@register_task` is outer and `@cron_task` is directly above the function (i.e., apply register_task first, then cron_task) to match the pattern used in outbound tasks and ensure the task is registered before the cron wrapper wraps it (see process_inbound_messages_queue_task, cron_task, register_task and the similar pattern in outbound_tasks.py).
🧹 Nitpick comments (16)
src/backend/core/utils.py (2)
32-50:EagerBrokerdoesn't store failure results via the Results middleware.
rm.after_process_messageis called on the success path (line 43–44) but not in thefinally/exception path. Dramatiq'safter_process_messagesignature accepts anexceptionkeyword argument; failing to call it on error means the Results backend never records task failures when usingEagerBroker. Tests that verify failure status viamessage.get_result()would always seeResultMissinginstead ofResultFailure.♻️ Proposed fix
try: result = actor.fn(*message.args, **message.kwargs) if rm: rm.after_process_message(self, message, result=result) + except Exception as exc: + if rm: + rm.after_process_message(self, message, exception=exc) + raise finally: if cm: cm.after_process_message(self, message) if prev is not None: cm.before_process_message(self, prev)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/utils.py` around lines 32 - 50, The EagerBroker.enqueue implementation calls Results.after_process_message only on the success path, so failures aren't recorded; update enqueue to call rm.after_process_message in the exception path too by catching the exception, calling rm.after_process_message(self, message, exception=exc) before re-raising, and keep the existing call with result=result on success; refer to the enqueue method, the rm variable (Results middleware), and CurrentMessage handling to ensure middleware before/after calls remain balanced when propagating the exception.
20-21:TASK_TRACKING_CACHE_TTLhardcoded to 30 days — consider tying it to settings.The comment notes it "matches DRAMATIQ_RESULT_BACKEND TTL", but if that setting changes, these two will silently diverge. Exposing this as a Django setting (or deriving it from the result backend configuration) would keep them in sync.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/utils.py` around lines 20 - 21, TASK_TRACKING_CACHE_TTL is hardcoded to 30 days and can drift from the DRAMATIQ_RESULT_BACKEND TTL; change it to read from configuration instead. Update src/backend/core/utils.py to derive TASK_TRACKING_CACHE_TTL from a Django setting (e.g., settings.TASK_TRACKING_CACHE_TTL) or compute it from the DRAMATIQ_RESULT_BACKEND TTL (fetch the backend TTL from settings or the dramatiq config) and fall back to the existing 86400 * 30 default; keep TASK_PROGRESS_CACHE_TIMEOUT unchanged or also expose it via settings if desired. Ensure you reference and use the existing symbol TASK_TRACKING_CACHE_TTL in the module so callers remain unchanged.src/backend/core/tests/tasks/test_task_send_message.py (1)
69-71: Stale# pylint: disable=no-value-for-parametercomments.These suppressions were needed for Celery bound tasks (where
selfwas an explicit parameter). With Dramatiq,Actor.__call__accepts*args, **kwargs, so pylint won't emitno-value-for-parameterhere. The same suppression appears on lines 115, 154, and 176. All four can be removed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/tasks/test_task_send_message.py` around lines 69 - 71, Remove the now-stale pylint suppression comments that disable no-value-for-parameter for Dramatiq actors: delete the "# pylint: disable=no-value-for-parameter" suffix from the call sites of send_message_task in this test file (the occurrences at the call with str(draft_message.id), must_archive=True and the other three calls referenced on lines 115, 154, and 176). Ensure the send_message_task calls remain unchanged otherwise; just remove the pylint disable comments so the code no longer contains unnecessary suppressions.src/backend/core/services/importer/imap.py (1)
21-26:import loggingandloggerdeclaration are misplaced among imports.
loggingis a stdlib module and must appear in the stdlib import block (beforefrom django.conf import settings), andlogger = logging.getLogger(__name__)should follow all imports, not sit between them. The project's ruff isort config (section-order = ["future", "standard-library", "django", ...]) will flag this.♻️ Proposed fix
Move the
import loggingline up to be with the other stdlib imports (e.g., afterimport time):import base64 import codecs import imaplib +import logging import re import socket import ssl import time from typing import Any, Dict, List, Optional, Tuple from django.conf import settings -import logging -logger = logging.getLogger(__name__) - from core.mda.inbound import deliver_inbound_message from core.mda.rfc5322 import parse_email_message from core.utils import set_task_progress + +logger = logging.getLogger(__name__)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/importer/imap.py` around lines 21 - 26, The import order is wrong: move the "import logging" statement into the standard-library import block (e.g., alongside other stdlib imports such as "import time") so it appears before Django and third-party imports, and relocate "logger = logging.getLogger(__name__)" to the bottom of the import section (after all imports) so the module-level logger is declared only after imports are complete; update the file's import ordering around the existing "from django.conf import settings" and the lines referencing deliver_inbound_message, parse_email_message, and set_task_progress accordingly.src/backend/core/management/commands/run_task.py (1)
1-7: Stale "Celery" references throughout the file's documentation.The module docstring, class docstring (
line 19),helptext (line 22), andadd_argumentsdocstring (line 33) all still describe this as a "Celery task" runner. These should be updated to reflect the Dramatiq migration.♻️ Suggested docstring updates
-""" -Management command to run arbitrary Celery tasks synchronously. - -This command provides a Django interface to run Celery tasks with the same -CLI flags as the main Celery CLI, but executes them synchronously instead -of queuing them as background tasks. -""" +""" +Management command to run arbitrary tasks synchronously. + +This command provides a Django interface to run Dramatiq tasks synchronously +instead of queuing them as background tasks. +"""-class Command(BaseCommand): - """Run arbitrary Celery tasks synchronously.""" +class Command(BaseCommand): + """Run arbitrary Dramatiq tasks synchronously.""" help = """ - Run arbitrary Celery tasks synchronously. + Run arbitrary tasks synchronously.- parser.add_argument("task_name", help="Name of the Celery task to run") + parser.add_argument("task_name", help="Name of the task to run")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/management/commands/run_task.py` around lines 1 - 7, Update all stale "Celery" references in this module to reflect the Dramatiq migration: change the module-level docstring, the Command class docstring, the Command.help text, and the add_arguments docstring (and any inline comments mentioning Celery) to refer to Dramatiq (e.g., "Dramatiq actors/tasks" and "Dramatiq CLI flags" as appropriate) and adjust wording where necessary so the docs accurately describe running Dramatiq actors synchronously instead of queuing; search for the Command class, its help attribute, and the add_arguments method to locate and update each string.src/backend/worker.py (1)
162-186: Validate--concurrencyis positive before passing it to Dramatiq.A user can pass 0 or a negative number; failing early yields clearer errors than letting Dramatiq fail later.
♻️ Suggested validation
- dramatiq_args = [ + processes = args.concurrency or 4 + if processes < 1: + sys.stderr.write("Error: --concurrency must be >= 1\n") + sys.exit(1) + + dramatiq_args = [ "dramatiq", "--path", ".", - "--processes", str(args.concurrency or 4), + "--processes", str(processes), "--threads", "1", "--worker-shutdown-timeout", "600000", ]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/worker.py` around lines 162 - 186, Validate that args.concurrency is a positive integer before building dramatiq_args: check args.concurrency (used when constructing dramatiq_args and passed to dramatiq.cli.main()) and if it's None, leave default behavior; if it's provided and <= 0, raise a clear error (e.g., argparse error or SystemExit with a descriptive message) or coerce to a minimum of 1. Perform this check right before creating dramatiq_args so the invalid value is rejected early and the error message references "--concurrency" and args.concurrency for clarity.src/backend/core/tests/api/test_import_file_upload.py (1)
75-87: Split owner vs non-owner checks into separate tests.This test now exercises two distinct paths; splitting keeps each test single-purpose and reduces assertion noise.
As per coding guidelines, Unit tests should focus on a single use case, keep assertions minimal, and cover all possible cases.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/api/test_import_file_upload.py` around lines 75 - 87, Split the combined assertions into two focused tests: one that verifies non-owner access is forbidden and another that verifies the owner can access the task and sees the import result. Extract common setup (creating users, URL, and result_data) into a shared fixture or test setup, then implement test_non_owner_cannot_access where you create APIClient(), force_authenticate(user=user2) and assert GET(url) returns status.HTTP_403_FORBIDDEN, and implement test_owner_can_access_and_sees_result where you create APIClient(), force_authenticate(user=user1), mock.patch("dramatiq.Message.get_result", return_value=result_data) and assert GET(url) returns status.HTTP_200_OK and response.data["result"]["imported"] == 42; keep client1/client2, url, and result_data identifiers unchanged to locate the code.src/backend/core/tests/api/test_messages_import.py (1)
185-202: Remove leftover.status = "PENDING"mock attribute.
mock_task.return_value.status = "PENDING"is set but never asserted intest_api_import_mbox_async,test_api_import_pst_file_async, andtest_api_import_pst_autodetect. This is a Celery remnant —CeleryCompatActordoesn't expose astatusattribute after.delay().♻️ Proposed cleanup (same pattern in the other two async tests)
mock_task.return_value.id = "fake-task-id" - mock_task.return_value.status = "PENDING" response = api_client.post(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/api/test_messages_import.py` around lines 185 - 202, Remove the leftover Celery remnant setting mock_task.return_value.status = "PENDING" in the async import tests; locate the occurrences in test_api_import_mbox_async, test_api_import_pst_file_async, and test_api_import_pst_autodetect (the lines that set mock_task.return_value.status = "PENDING") and delete them so the mocked task only sets the id and not an unused status attribute.src/backend/core/services/importer/imap_tasks.py (1)
6-7: Moveloggerinitialization after all imports.Placing
logger = logging.getLogger(__name__)between import groups breaks the PEP 8 convention of grouping all imports before any module-level initializations. The same pattern appears ineml_tasks.pylines 9–10.♻️ Proposed fix
from typing import Any, Dict -import logging -logger = logging.getLogger(__name__) - +import logging + from core.models import Mailbox from core.utils import register_task, set_task_progress from .imap import ( ... ) + +logger = logging.getLogger(__name__)As per coding guidelines: "Follow Django/PEP 8 style with a 100-character line limit."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/importer/imap_tasks.py` around lines 6 - 7, The module-level logger initialization is placed between import groups which violates PEP8 import grouping; move the statement logger = logging.getLogger(__name__) so it appears after all imports in src/backend/core/services/importer/imap_tasks.py (and similarly adjust eml_tasks.py where present), keeping all standard/library/third-party/local imports grouped first and then declare the logger at module scope immediately after the final import.src/backend/core/tests/importer/test_file_import.py (1)
376-381:set_task_progresswill emit a warning log in these tests.
test_import_message_to_different_mailbox_same_domain(andtest_import_message_with_from_equal_to_mailbox_sets_is_senderat line 440) callprocess_eml_file_taskwithout mockingset_task_progress. Outside a Dramatiq actor contextCurrentMessage.get_current_message()returnsNone, causing a"set_task_progress called outside of a dramatiq actor"warning for each run. No functional failure, but it pollutes test output.♻️ Suppress the warning in isolation tests
- with patch("core.services.importer.eml_tasks.storages") as mock_storages: + with patch("core.services.importer.eml_tasks.storages") as mock_storages, \ + patch("core.services.importer.eml_tasks.set_task_progress"): mock_storages.__getitem__.return_value = mock_storage🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/tests/importer/test_file_import.py` around lines 376 - 381, The tests call process_eml_file_task outside a Dramatiq actor so set_task_progress emits a warning; to suppress it patch out set_task_progress in these tests. Update the tests (e.g., test_import_message_to_different_mailbox_same_domain and test_import_message_with_from_equal_to_mailbox_sets_is_sender) to wrap the call to process_eml_file_task with a patch of core.services.importer.eml_tasks.set_task_progress (or add a patch decorator) that replaces it with a no-op/mock, leaving the rest of the test unchanged.src/backend/core/mda/outbound_tasks.py (1)
162-179: Progress update fires only at batch boundaries, not on the last message.The condition
if index % batch_size == 0means the final progress update inside the loop will occur at the last multiple ofbatch_size, potentially skipping the tail. Consider also updating whenindex == total_messages - 1(or after the loop) so the final count is accurate.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/mda/outbound_tasks.py` around lines 162 - 179, The progress update currently triggers only when index % batch_size == 0, which can skip the final tail batch; modify the logic in the loop that iterates messages_to_process.iterator (using variables index, batch_size, total_messages, processed_count, success_count, error_count) to also call set_task_progress when index == total_messages - 1 (or otherwise ensure a final set_task_progress after the loop) so the final progress reflects all processed messages and updates the "processed_messages"/counts for the last partial batch.src/backend/core/services/exporter/tasks.py (3)
437-605: Noset_task_progress(100, ...)on completion.Progress reporting reaches 95 at line 548 (notification step) but never hits 100 on success. If any UI polls progress to determine completion, it will remain at 95 even after the task returns. Consider adding a final
set_task_progress(100, ...)before the success return on line 586.Proposed addition before the return
return {"status": "SUCCESS", "result": result, "error": None}→
+ set_task_progress(100, {"result": result, "error": None}) return {"status": "SUCCESS", "result": result, "error": None}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/exporter/tasks.py` around lines 437 - 605, The progress never reaches 100% on successful export; before the success return (after generate_presigned_url and _create_notification_message) call set_task_progress(100, {...}) with the final result payload (message_status "Export completed", total_messages, exported_count, skipped_count, s3_key) so the UI sees completion; update the same result dict you return and invoke set_task_progress(100, {"result": result, "error": None}) just prior to returning {"status": "SUCCESS", "result": result, "error": None}.
402-403:user_idparameter is unused in the function body.The
# pylint: disable=unused-argumentsuppresses the warning, butuser_idis never referenced insideexport_mailbox_task. If ownership tracking now happens at the call site viatask.track_owner(user_id), consider documenting why the parameter is still present (e.g., for result-backend metadata or future use), or remove it if callers have been updated.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/exporter/tasks.py` around lines 402 - 403, The export_mailbox_task function has an unused user_id parameter (export_mailbox_task) suppressed by pylint; either remove the parameter from the signature and update callers, or explicitly use it for metadata/ownership (e.g., pass into task.track_owner(user_id) or include in result-backend/task metadata) and add a short docstring comment explaining why it is needed; update any callers and tests accordingly so the signature and usage remain consistent.
15-16: Import ordering:loggingplaced between Django and local imports.
import logging(stdlib) should precede all third-party/Django imports per PEP 8 / isort convention.Suggested reordering
+import logging import gzip import html import io import re + from datetime import datetime, timezone from email.message import EmailMessage from email.utils import format_datetime from typing import Any, Dict from django.conf import settings from django.core.files.storage import storages -import logging -logger = logging.getLogger(__name__) - from core.api.utils import generate_presigned_url ... + +logger = logging.getLogger(__name__)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/services/exporter/tasks.py` around lines 15 - 16, Move the standard-library import `import logging` to the top of the import block so it appears before any third-party/Django and local imports (PEP8/isort order), then keep the existing `logger = logging.getLogger(__name__)` line unchanged; this affects the module-level imports in the file and ensures `logging` is declared before Django imports like any other stdlib module.src/backend/core/mda/inbound_tasks.py (2)
318-329:.delay()vs.send()— docstring says.send()but code uses.delay().Line 293 documents the call as
process_inbound_message_task.send()but line 321 uses.delay(). PresumablyCeleryCompatActoraliases both, but the inconsistency could confuse future contributors. Consider aligning one way.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/mda/inbound_tasks.py` around lines 318 - 329, The loop uses process_inbound_message_task.delay(...) while the docstring (and CeleryCompatActor abstraction) references .send(...), creating an inconsistency; standardize to a single API by replacing the .delay calls in the old_messages retry loop with the documented .send(...) (or alternatively change the docstring to document .delay if that is preferred) and ensure process_inbound_message_task (CeleryCompatActor) supports the chosen alias so callers and docs match (update the loop that iterates old_messages and the docstring referencing process_inbound_message_task accordingly).
5-19: Import ordering:loggingandrequestsare placed after Django imports.Standard Python convention (and isort defaults) place stdlib imports (
logging) before third-party imports (django.*,requests), and third-party imports before local imports (core.*). Currentlylogging(line 11) andrequests(line 16) are interleaved with local imports.Suggested reordering
import re +import logging from typing import Any, Dict, Optional, Tuple from django.core.cache import cache from django.utils import timezone -import logging +import requests from core import models from core.mda.inbound_create import _create_message_from_inbound from core.mda.rfc5322 import parse_email_message -import requests from core.utils import cron_task, register_task🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/backend/core/mda/inbound_tasks.py` around lines 5 - 19, The import ordering mixes stdlib and third-party/local imports; move the stdlib import "logging" to the top group, keep third-party Django imports ("django.core.cache", "django.utils") in the middle group, then third-party "requests" after Django, and finally local imports from "core" (e.g., models, _create_message_from_inbound, parse_email_message, cron_task, register_task); adjust the import blocks around the module-level symbols logger and any use of requests so imports follow the stdlib → third-party → local convention.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
src/backend/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (38)
Makefilecompose.yamlsrc/backend/core/admin.pysrc/backend/core/api/viewsets/send.pysrc/backend/core/api/viewsets/task.pysrc/backend/core/management/commands/run_task.pysrc/backend/core/mda/inbound_tasks.pysrc/backend/core/mda/outbound.pysrc/backend/core/mda/outbound_tasks.pysrc/backend/core/services/exporter/tasks.pysrc/backend/core/services/importer/eml_tasks.pysrc/backend/core/services/importer/imap.pysrc/backend/core/services/importer/imap_tasks.pysrc/backend/core/services/importer/mbox_tasks.pysrc/backend/core/services/importer/pst_tasks.pysrc/backend/core/services/importer/service.pysrc/backend/core/services/search/tasks.pysrc/backend/core/tasks.pysrc/backend/core/tests/api/test_draft_attachments.pysrc/backend/core/tests/api/test_import_file_upload.pysrc/backend/core/tests/api/test_messages_create.pysrc/backend/core/tests/api/test_messages_import.pysrc/backend/core/tests/exporter/test_export_task.pysrc/backend/core/tests/importer/test_file_import.pysrc/backend/core/tests/importer/test_imap_import.pysrc/backend/core/tests/importer/test_import_service.pysrc/backend/core/tests/importer/test_pst_import.pysrc/backend/core/tests/mda/test_retry.pysrc/backend/core/tests/mda/test_spam_processing.pysrc/backend/core/tests/tasks/test_task_importer.pysrc/backend/core/tests/tasks/test_task_send_message.pysrc/backend/core/tests/test_worker.pysrc/backend/core/utils.pysrc/backend/messages/__init__.pysrc/backend/messages/celery_app.pysrc/backend/messages/settings.pysrc/backend/pyproject.tomlsrc/backend/worker.py
💤 Files with no reviewable changes (1)
- src/backend/messages/celery_app.py
| try: | ||
| result_data = message.get_result(block=False) | ||
| except ResultMissing: | ||
| result_data = None | ||
| except ResultFailure as exc: | ||
| return Response({ | ||
| "status": "FAILURE", | ||
| "result": None, | ||
| "error": str(exc), | ||
| }) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the project's dependency file to check Dramatiq version
fd -e "requirements.txt" -e "pyproject.toml" -e "poetry.lock" -e "Pipfile" | head -20Repository: suitenumerique/messages
Length of output: 49
🏁 Script executed:
# Check the actual code at the specified location
cat -n src/backend/core/api/viewsets/task.py | sed -n '85,105p'Repository: suitenumerique/messages
Length of output: 910
🏁 Script executed:
# Search for any existing Sentry imports in the backend
rg "from sentry_sdk import|import sentry_sdk" src/backend/ -A 2Repository: suitenumerique/messages
Length of output: 1588
🏁 Script executed:
# Check if there's a logger already configured in this file
rg "import logging|logger = " src/backend/core/api/viewsets/task.py -A 2Repository: suitenumerique/messages
Length of output: 140
🏁 Script executed:
# Find dependency files
find . -name "requirements*.txt" -o -name "pyproject.toml" -o -name "setup.py" | grep -v __pycache__ | head -10Repository: suitenumerique/messages
Length of output: 226
🏁 Script executed:
# Check git history or setup files for Dramatiq version
rg "dramatiq" . --max-count=5 -A 2 | head -30Repository: suitenumerique/messages
Length of output: 2047
🌐 Web query:
Dramatiq 2.0.1 ResultFailure exception __str__ what does it return
💡 Result:
In Dramatiq 2.0.1, ResultFailure does not override __str__; it inherits it from dramatiq.errors.DramatiqError. That __str__ returns:
str(self.message)if it’s truthy, otherwiserepr(self.message)(fallback for falsy / empty messages). [1]
ResultFailure.__init__ just stores the message plus orig_exc_type / orig_exc_msg; those extra fields do not affect __str__ directly. [2]
So str(e) where e is a ResultFailure is essentially whatever message Dramatiq constructed when raising it (commonly shown as "actor raised <ExceptionType>: <ExceptionMessage>"). [3]
Sources:
[1] DramatiqError __str__ implementation https://dramatiq.io/_modules/dramatiq/errors.html
[2] ResultFailure definition https://dramatiq.io/_modules/dramatiq/results/errors.html
[3] Docs example showing the message format https://dramatiq.io/reference.html
🏁 Script executed:
# Check for similar exception handling patterns in the codebase
rg "ResultFailure|ResultMissing" src/backend/ -B 3 -A 5Repository: suitenumerique/messages
Length of output: 1355
🏁 Script executed:
# Look for other places where exceptions are returned to clients in viewsets
rg "return Response.*str\(exc\)" src/backend/ -B 2 -A 2Repository: suitenumerique/messages
Length of output: 49
🏁 Script executed:
# Check the imports in the task viewset file
cat -n src/backend/core/api/viewsets/task.py | head -30Repository: suitenumerique/messages
Length of output: 991
Capture exceptions internally and return a generic error message to clients.
str(exc) exposes the Dramatiq task failure message (formatted as "actor raised <ExceptionType>: <ExceptionMessage>") which can leak internal exception details to clients. Log and capture the exception, then return a generic error response.
Add the import and update the exception handler:
Fix
import dramatiq
+from sentry_sdk import capture_exception except ResultFailure as exc:
+ logger.exception("Task %s failed", task_id)
+ capture_exception(exc)
return Response({
"status": "FAILURE",
"result": None,
- "error": str(exc),
+ "error": "Task execution failed.",
})🧰 Tools
🪛 GitHub Check: CodeQL
[warning] 95-99: Information exposure through an exception
Stack trace information flows to this location and may be exposed to an external user.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/api/viewsets/task.py` around lines 90 - 99, The
ResultFailure handler currently returns str(exc) which can leak internal
exception details; update the exception handling around message.get_result (the
except ResultFailure as exc block) to log the full exception internally (e.g.,
logger.exception or process_logger.exception after importing the logger) and
return a generic client-safe Response payload (e.g., {"status": "FAILURE",
"result": None, "error": "Task failed"}). Add the necessary logger import and
replace the returned error string with the generic message while ensuring the
original exception is captured in the internal log call.
| # Check if we have progress data for this task | ||
| progress_data = get_task_progress(task_id) | ||
| if progress_data: | ||
| return Response( | ||
| { | ||
| "status": "PROGRESS", | ||
| "result": None, | ||
| "error": None, | ||
| "progress": progress_data.get("progress"), | ||
| "message": progress_data.get("metadata", {}).get("message"), | ||
| "timestamp": progress_data.get("timestamp"), | ||
| } |
There was a problem hiding this comment.
Expose progress metadata in PROGRESS responses.
set_task_progress stores metadata (e.g., mbox tasks put counts under metadata["result"]), but the API currently drops it and returns message=None. This hides meaningful progress details from clients.
🔧 Proposed fix
progress_data = get_task_progress(task_id)
if progress_data:
+ metadata = progress_data.get("metadata", {}) or {}
return Response(
{
"status": "PROGRESS",
- "result": None,
- "error": None,
+ "result": metadata or None,
+ "error": metadata.get("error"),
"progress": progress_data.get("progress"),
- "message": progress_data.get("metadata", {}).get("message"),
+ "message": metadata.get("message")
+ or metadata.get("result", {}).get("message_status"),
"timestamp": progress_data.get("timestamp"),
}
)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/api/viewsets/task.py` around lines 113 - 124, The PROGRESS
response drops useful metadata set by set_task_progress; update the block that
uses get_task_progress (in the viewset handling task status) to include the
metadata payload in the Response: return the metadata object (e.g.,
progress_data.get("metadata")) and surface any nested fields like
metadata.get("result") and metadata.get("message") instead of returning
message=None; keep existing keys (status, result, error, progress, timestamp)
but populate result/message from progress_data["metadata"] when present so
clients receive the stored progress details.
| from core.models import Mailbox | ||
|
|
||
| from messages.celery_app import app as celery_app | ||
| from core.utils import register_task, set_task_progress |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's find all occurrences of set_task_progress in the codebase
rg -n "set_task_progress" --type=pyRepository: suitenumerique/messages
Length of output: 4379
🏁 Script executed:
# Now let's examine the imap_tasks.py file to see what functions are defined and used
cat -n src/backend/core/services/importer/imap_tasks.pyRepository: suitenumerique/messages
Length of output: 6364
🏁 Script executed:
# Let's also look at the other importer task files mentioned for comparison
ls -la src/backend/core/services/importer/Repository: suitenumerique/messages
Length of output: 798
Remove the unused set_task_progress import from line 10.
set_task_progress is imported in imap_tasks.py but is never called anywhere in this file. Unlike other importer tasks (eml_tasks.py, mbox_tasks.py, pst_tasks.py), which call it directly, the IMAP importer delegates progress reporting to process_folder_messages in imap.py, which imports set_task_progress independently. Remove the unused import to clean up dead code.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/services/importer/imap_tasks.py` at line 10, Remove the
unused import set_task_progress from the top-level import in imap_tasks.py: keep
register_task but delete set_task_progress since progress reporting for IMAP is
handled by process_folder_messages in imap.py which imports set_task_progress
itself; update the import line that currently references both register_task and
set_task_progress to only import register_task.
| # Create a draft message (is_sender=True since this is an outbound draft) | ||
| draft = factories.MessageFactory( | ||
| thread=thread, sender=sender, is_draft=True, subject="Existing draft" | ||
| thread=thread, sender=sender, is_draft=True, is_sender=True, subject="Existing draft" | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
rg -n --type=py 'is_sender' --glob '!**/tests/**' src/backend/Repository: suitenumerique/messages
Length of output: 4778
🏁 Script executed:
# Check the send-message view/endpoint to see if it validates or checks is_sender
fd -t f '\.py$' src/backend/ --glob '!**/tests/**' | xargs rg -l 'send.*message|send.*draft' | head -10Repository: suitenumerique/messages
Length of output: 659
🏁 Script executed:
# Examine the test at line 534 and surrounding context
sed -n '530,545p' src/backend/core/tests/api/test_draft_attachments.pyRepository: suitenumerique/messages
Length of output: 665
🏁 Script executed:
# Check the send-message endpoint implementation
sed -n '150,180p' src/backend/core/api/viewsets/message.pyRepository: suitenumerique/messages
Length of output: 1384
🏁 Script executed:
# Check the serializer validation around line 842
sed -n '830,860p' src/backend/core/api/serializers.pyRepository: suitenumerique/messages
Length of output: 1157
🏁 Script executed:
# Check the full test method at line 534 to see what it expects
sed -n '530,570p' src/backend/core/tests/api/test_draft_attachments.pyRepository: suitenumerique/messages
Length of output: 1807
🏁 Script executed:
# Check draft.py to see how drafts are created with is_sender
sed -n '380,395p' src/backend/core/mda/draft.pyRepository: suitenumerique/messages
Length of output: 483
🏁 Script executed:
# Find the send-message endpoint implementation
rg -n 'send-message|send_message' src/backend/core/api/ --type=pyRepository: suitenumerique/messages
Length of output: 411
🏁 Script executed:
# Look for the send endpoint in viewsets
rg -n 'def send' src/backend/core/api/viewsets/ --type=py -A 3Repository: suitenumerique/messages
Length of output: 449
🏁 Script executed:
# Check if there's a send serializer
fd -t f -name '*.py' src/backend/core/api/ | xargs rg -l 'send.*serializer' -iRepository: suitenumerique/messages
Length of output: 476
🏁 Script executed:
# Examine the send viewset implementation
cat -n src/backend/core/api/viewsets/send.pyRepository: suitenumerique/messages
Length of output: 5695
🏁 Script executed:
# Check send_message_task in outbound_tasks
sed -n '110,150p' src/backend/core/mda/outbound_tasks.pyRepository: suitenumerique/messages
Length of output: 1287
🏁 Script executed:
# Check prepare_outbound_message function to see if it validates is_sender
rg -n 'def prepare_outbound_message' src/backend/core/mda/outbound.py -A 30Repository: suitenumerique/messages
Length of output: 1297
🏁 Script executed:
# Also check the test expectations more carefully - what does the test expect when sending a draft without is_sender?
sed -n '520,570p' src/backend/core/tests/api/test_draft_attachments.pyRepository: suitenumerique/messages
Length of output: 2166
Align is_sender=True on outbound draft at line 534 with the change at line 138
The is_sender=True addition at line 138 is correct and establishes the pattern that outbound drafts should be marked as sender-side. However, test_draft_send_with_attachments_exceeding_size_limit at line 534 creates an identically structured outbound draft without it:
draft = factories.MessageFactory(
thread=thread, sender=sender, is_draft=True, subject="Test draft" # is_sender=True missing
)While the test passes (the size-limit check fires before any is_sender consideration), the draft is semantically inconsistent with the outbound-draft convention. For consistency, add is_sender=True to line 534 to match line 138.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/core/tests/api/test_draft_attachments.py` around lines 136 - 139,
In the test_draft_send_with_attachments_exceeding_size_limit test in
src/backend/core/tests/api/test_draft_attachments.py, update the MessageFactory
call that creates the outbound draft to include is_sender=True so it matches the
outbound-draft convention used elsewhere (e.g., the earlier draft creation);
specifically, add is_sender=True to the factories.MessageFactory(...) invocation
inside test_draft_send_with_attachments_exceeding_size_limit.
| import multiprocessing | ||
| import os | ||
| import sys | ||
|
|
||
| # Workaround for Dramatiq + Python 3.14: forkserver (the new default) breaks | ||
| # Dramatiq's Canteen shared-memory mechanism, causing worker processes to never | ||
| # consume messages. See https://github.com/Bogdanp/dramatiq/issues/701 | ||
| # Must be set before dramatiq.cli.main() spawns worker processes. | ||
| multiprocessing.set_start_method("fork", force=True) |
There was a problem hiding this comment.
Guard the fork-only start method for unsupported platforms.
set_start_method("fork") raises on platforms without fork (e.g., Windows). A small guard avoids hard failures outside Linux environments.
🛠️ Suggested guard
-multiprocessing.set_start_method("fork", force=True)
+if "fork" in multiprocessing.get_all_start_methods():
+ multiprocessing.set_start_method("fork", force=True)
+else:
+ logging.getLogger(__name__).warning(
+ "Multiprocessing start method 'fork' is unavailable; using default."
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import multiprocessing | |
| import os | |
| import sys | |
| # Workaround for Dramatiq + Python 3.14: forkserver (the new default) breaks | |
| # Dramatiq's Canteen shared-memory mechanism, causing worker processes to never | |
| # consume messages. See https://github.com/Bogdanp/dramatiq/issues/701 | |
| # Must be set before dramatiq.cli.main() spawns worker processes. | |
| multiprocessing.set_start_method("fork", force=True) | |
| import logging | |
| import multiprocessing | |
| import os | |
| import sys | |
| # Workaround for Dramatiq + Python 3.14: forkserver (the new default) breaks | |
| # Dramatiq's Canteen shared-memory mechanism, causing worker processes to never | |
| # consume messages. See https://github.com/Bogdanp/dramatiq/issues/701 | |
| # Must be set before dramatiq.cli.main() spawns worker processes. | |
| if "fork" in multiprocessing.get_all_start_methods(): | |
| multiprocessing.set_start_method("fork", force=True) | |
| else: | |
| logging.getLogger(__name__).warning( | |
| "Multiprocessing start method 'fork' is unavailable; using default." | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/backend/worker.py` around lines 23 - 31, The unconditional call to
multiprocessing.set_start_method("fork", force=True) will raise on platforms
without fork (e.g., Windows); wrap or gate this call so it only runs on
platforms that support fork (e.g., check os.name == "posix" or sys.platform
startswith "linux"/"darwin") or catch and ignore the specific exceptions
(RuntimeError/AttributeError) raised when the start method cannot be changed,
then leave a brief comment referencing Dramatiq's fork requirement; update the
module-level call to use that platform guard around
multiprocessing.set_start_method so the worker process startup doesn't hard-fail
on non-fork systems.
Replaces #390
Summary by CodeRabbit