Skip to content

Commit 54dfaf3

Browse files
feat: add PluggableContextTask for settings-based celery task context managers (#14572)
Add PluggableContextTask between DojoAsyncTask and PgHistoryTask that loads context managers from the CELERY_TASK_CONTEXT_MANAGERS setting. This allows plugins (e.g. Pro) to wrap all celery tasks with custom context managers without relying on celery signals (which don't fire in prefork workers). Also propagate sync kwarg from process_findings to dojo_dispatch_task in both DefaultImporter and DefaultReImporter so callers can force post_process_findings_batch to run in-process.
1 parent 0359bba commit 54dfaf3

3 files changed

Lines changed: 35 additions & 2 deletions

File tree

dojo/celery.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,44 @@ def apply_async(self, args=None, kwargs=None, **options):
7676
return super().apply_async(args=args, kwargs=kwargs, **options)
7777

7878

79-
class PgHistoryTask(DojoAsyncTask):
79+
class PluggableContextTask(DojoAsyncTask):
80+
81+
"""
82+
Extends DojoAsyncTask with pluggable context managers loaded from settings.
83+
84+
CELERY_TASK_CONTEXT_MANAGERS is a list of dotted paths to callables that
85+
return context managers. Each task execution is wrapped in all of them.
86+
This replaces the celery signal-based approach (task_prerun/task_postrun)
87+
which does not work reliably with prefork worker pools.
88+
"""
89+
90+
def __call__(self, *args, **kwargs):
91+
from contextlib import ExitStack # noqa: PLC0415
92+
93+
from django.utils.module_loading import import_string # noqa: PLC0415
94+
95+
cm_paths = getattr(settings, "CELERY_TASK_CONTEXT_MANAGERS", [])
96+
if not cm_paths:
97+
return super().__call__(*args, **kwargs)
98+
99+
# ExitStack ensures all entered context managers are properly exited
100+
# (via __exit__) even if the task raises an exception, so cleanup
101+
# and batch dispatch always happen.
102+
with ExitStack() as stack:
103+
for path in cm_paths:
104+
cm_factory = import_string(path)
105+
stack.enter_context(cm_factory())
106+
return super().__call__(*args, **kwargs)
107+
108+
109+
class PgHistoryTask(PluggableContextTask):
80110

81111
"""
82112
Custom Celery base task that automatically applies pghistory context.
83113
84-
This class inherits from DojoAsyncTask to provide:
114+
This class inherits from PluggableContextTask to provide:
85115
- User context injection and task tracking (from DojoAsyncTask)
116+
- Pluggable context managers from settings (from PluggableContextTask)
86117
- Automatic pghistory context application (from this class)
87118
88119
When a task is dispatched via dojo_dispatch_task or dojo_async_task, the current

dojo/importers/default_importer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ def process_findings(
273273
product_grading_option=True,
274274
issue_updater_option=True,
275275
push_to_jira=push_to_jira,
276+
sync=kwargs.get("sync", False),
276277
)
277278

278279
# No chord: tasks are dispatched immediately above per batch

dojo/importers/default_reimporter.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ def process_findings(
441441
issue_updater_option=True,
442442
push_to_jira=push_to_jira,
443443
jira_instance_id=getattr(self.jira_instance, "id", None),
444+
sync=kwargs.get("sync", False),
444445
)
445446

446447
# No chord: tasks are dispatched immediately above per batch

0 commit comments

Comments
 (0)