Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions dojo/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,26 @@ def __call__(self, *args, **kwargs):
"""
Restore user context in the celery worker via crum.impersonate.

The apply_async method injects ``async_user`` into kwargs when a task
is dispatched. Here we pop it and set it as the current user in
thread-local storage so that all downstream code — including nested
dojo_dispatch_task calls — sees the correct user via
get_current_user().
The apply_async method injects ``async_user_id`` into kwargs when a task
is dispatched. Here we pop it, resolve to a user instance, and set it
as the current user in thread-local storage so that all downstream
code — including nested dojo_dispatch_task calls — sees the correct
user via get_current_user().

When a task is called directly (not via apply_async), async_user is
When a task is called directly (not via apply_async), async_user_id is
not in kwargs. In that case we leave the existing crum context
intact so that callers who already set a user (e.g. via
crum.impersonate in tests or request middleware) are not disrupted.
"""
if "async_user" not in kwargs:
if "async_user_id" not in kwargs:
return super().__call__(*args, **kwargs)

import crum # noqa: PLC0415

user = kwargs.pop("async_user")
from dojo.models import Dojo_User # noqa: PLC0415 circular import

user_id = kwargs.pop("async_user_id")
user = Dojo_User.objects.filter(pk=user_id).first() if user_id else None
with crum.impersonate(user):
return super().__call__(*args, **kwargs)

Expand All @@ -59,8 +62,9 @@ def apply_async(self, args=None, kwargs=None, **options):
# Inject user context for Dojo tasks only. Celery built-in tasks (e.g.
# celery.backend_cleanup) do not accept custom kwargs.
task_name = self.name or ""
if not task_name.startswith("celery.") and "async_user" not in kwargs:
kwargs["async_user"] = get_current_user()
if not task_name.startswith("celery.") and "async_user_id" not in kwargs:
user = get_current_user()
kwargs["async_user_id"] = user.id if user else None

# Control flag used for sync/async decision; never pass into the task itself
kwargs.pop("sync", None)
Expand Down Expand Up @@ -135,8 +139,6 @@ def __call__(self, *args, **kwargs):

app = Celery("dojo", task_cls=PgHistoryTask)

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object("django.conf:settings", namespace="CELERY")

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Expand Down
7 changes: 4 additions & 3 deletions dojo/celery_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ def apply_async(self, args: Any | None = None, kwargs: Any | None = None, **opti

def _inject_async_user(kwargs: Mapping[str, Any] | None) -> dict[str, Any]:
result: dict[str, Any] = dict(kwargs or {})
if "async_user" not in result:
if "async_user_id" not in result:
from dojo.utils import get_current_user # noqa: PLC0415 circular import

result["async_user"] = get_current_user()
user = get_current_user()
result["async_user_id"] = user.id if user else None
return result


Expand Down Expand Up @@ -58,7 +59,7 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur
"""
Dispatch a task/signature using DefectDojo semantics.

- Inject `async_user` if missing.
- Inject `async_user_id` if missing.
- Capture and inject pghistory context if available.
- Respect `sync=True` (foreground execution) and user `block_execution`.
- Support `countdown=<seconds>` for async dispatch.
Expand Down
2 changes: 1 addition & 1 deletion dojo/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __wrapper__(*args, **kwargs):
from dojo.utils import get_current_user # noqa: PLC0415 circular import

user = get_current_user()
kwargs["async_user"] = user
kwargs["async_user_id"] = user.id if user else None

# Capture pghistory context to pass to Celery worker
# The PgHistoryTask base class will apply this context in the worker
Expand Down
6 changes: 3 additions & 3 deletions dojo/forms.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
import logging
import pickle
import re
import warnings
from datetime import date, datetime
Expand Down Expand Up @@ -3477,7 +3477,7 @@ def __init__(self, attrs=None):

def decompress(self, value):
if value:
return pickle.loads(value)
return json.loads(value)
return [None, None, None, None, None, None]

def format_output(self, rendered_widgets):
Expand All @@ -3497,7 +3497,7 @@ def __init__(self, *args, **kwargs):
super().__init__(list_fields, *args, **kwargs)

def compress(self, values):
return pickle.dumps(values)
return json.dumps(values)


class CreateChoiceQuestionForm(forms.Form):
Expand Down
23 changes: 21 additions & 2 deletions dojo/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,16 @@ def save(self, *args, **kwargs):
from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import

dojo_dispatch_task(async_update_sla_expiration_dates_sla_config_sync, self, products, severities=severities)
dojo_dispatch_task(
async_update_sla_expiration_dates_sla_config_sync,
self.id,
list(products.values_list("id", flat=True)),
severities=severities,
)
# The async task refetches and resets async_updating on its own copy.
# Mirror that on this in-memory instance so a subsequent save() on the
# same instance does not trigger the lock-revert path at line 1058.
self.async_updating = False

def clean(self):
sla_days = [self.critical, self.high, self.medium, self.low]
Expand Down Expand Up @@ -1255,7 +1264,17 @@ def save(self, *args, **kwargs):
from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import

dojo_dispatch_task(async_update_sla_expiration_dates_sla_config_sync, sla_config, Product.objects.filter(id=self.id))
dojo_dispatch_task(
async_update_sla_expiration_dates_sla_config_sync,
sla_config.id,
[self.id],
)
# The async task refetches and resets async_updating on its own copies.
# Mirror that on this in-memory product and the in-memory sla_config so a
# subsequent save() on either does not trigger their lock-revert paths.
self.async_updating = False
if sla_config:
sla_config.async_updating = False

def get_absolute_url(self):
return reverse("view_product", args=[str(self.id)])
Expand Down
46 changes: 21 additions & 25 deletions dojo/notifications/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from dojo import __version__ as dd_version
from dojo.authorization.roles_permissions import Permissions
from dojo.celery_dispatch import dojo_dispatch_task
from dojo.decorators import we_want_async
from dojo.labels import get_labels
from dojo.models import (
Expand Down Expand Up @@ -834,58 +833,55 @@ def _process_notifications(

# Some errors should not be pushed to all channels, only to alerts.
# For example reasons why JIRA Issues: https://github.com/DefectDojo/django-DefectDojo/issues/11575
# Per-channel sends run synchronously inside the surrounding async_create_notification
# task body. Dispatching inner Celery tasks would require JSON-serializable kwargs, but
# callers pass model instances (finding/test/engagement/product/...) and refetching every
# one of them per channel would multiply DB queries; running synchronously avoids both.
if not alert_only:
user_id = getattr(notifications.user, "id", None)
if self.system_settings.enable_slack_notifications and "slack" in getattr(
notifications,
event,
notifications.other,
):
logger.debug("Sending Slack Notification")
dojo_dispatch_task(
send_slack_notification,
event,
user_id=getattr(notifications.user, "id", None),
**kwargs,
)
try:
send_slack_notification.run(event, user_id=user_id, **kwargs)
except Exception:
logger.exception("Failed to send Slack notification for event %s", event)

if self.system_settings.enable_msteams_notifications and "msteams" in getattr(
notifications,
event,
notifications.other,
):
logger.debug("Sending MSTeams Notification")
dojo_dispatch_task(
send_msteams_notification,
event,
user_id=getattr(notifications.user, "id", None),
**kwargs,
)
try:
send_msteams_notification.run(event, user_id=user_id, **kwargs)
except Exception:
logger.exception("Failed to send MSTeams notification for event %s", event)

if self.system_settings.enable_mail_notifications and "mail" in getattr(
notifications,
event,
notifications.other,
):
logger.debug("Sending Mail Notification")
dojo_dispatch_task(
send_mail_notification,
event,
user_id=getattr(notifications.user, "id", None),
**kwargs,
)
try:
send_mail_notification.run(event, user_id=user_id, **kwargs)
except Exception:
logger.exception("Failed to send Mail notification for event %s", event)

if self.system_settings.enable_webhooks_notifications and "webhooks" in getattr(
notifications,
event,
notifications.other,
):
logger.debug("Sending Webhooks Notification")
dojo_dispatch_task(
send_webhooks_notification,
event,
user_id=getattr(notifications.user, "id", None),
**kwargs,
)
try:
send_webhooks_notification.run(event, user_id=user_id, **kwargs)
except Exception:
logger.exception("Failed to send Webhooks notification for event %s", event)


def process_tag_notifications(request, note, parent_url, parent_title):
Expand Down
4 changes: 3 additions & 1 deletion dojo/notifications/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ def log_generic_alert(source, title, description):


@app.task(bind=True)
def add_alerts(self, runinterval, *args, **kwargs):
def add_alerts(self, *args, **kwargs):
# Run interval matches the beat schedule for this task (see CELERY_BEAT_SCHEDULE).
runinterval = timedelta(hours=1)
now = timezone.now()

upcoming_engagements = Engagement.objects.filter(target_start__gt=now + timedelta(days=3), target_start__lt=now + timedelta(days=3) + runinterval).order_by("target_start")
Expand Down
6 changes: 3 additions & 3 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
DD_CELERY_RESULT_BACKEND=(str, "django-db"),
DD_CELERY_RESULT_EXPIRES=(int, 86400),
DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")),
DD_CELERY_TASK_SERIALIZER=(str, "pickle"),
DD_CELERY_TASK_SERIALIZER=(str, "json"),
DD_CELERY_LOG_LEVEL=(str, "INFO"),
# Hard ceiling on task runtime. When reached, the worker process is sent SIGKILL — no cleanup
# code runs. Always set higher than DD_CELERY_TASK_SOFT_TIME_LIMIT. (0 = disabled, no limit)
Expand Down Expand Up @@ -847,8 +847,9 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
CELERY_TIMEZONE = TIME_ZONE
CELERY_RESULT_EXPIRES = env("DD_CELERY_RESULT_EXPIRES")
CELERY_BEAT_SCHEDULE_FILENAME = env("DD_CELERY_BEAT_SCHEDULE_FILENAME")
CELERY_ACCEPT_CONTENT = ["pickle", "json", "msgpack", "yaml"]
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = env("DD_CELERY_TASK_SERIALIZER")
CELERY_RESULT_SERIALIZER = "json"
CELERY_LOG_LEVEL = env("DD_CELERY_LOG_LEVEL")

if env("DD_CELERY_TASK_TIME_LIMIT") > 0:
Expand All @@ -872,7 +873,6 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
"add-alerts": {
"task": "dojo.notifications.tasks.add_alerts",
"schedule": timedelta(hours=1),
"args": [timedelta(hours=1)],
"options": {
"expires": int(60 * 60 * 1 * 1.2), # If a task is not executed within 72 minutes, it should be dropped from the queue. Two more tasks should be scheduled in the meantime.
},
Expand Down
7 changes: 6 additions & 1 deletion dojo/sla_config/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@


@app.task
def async_update_sla_expiration_dates_sla_config_sync(sla_config: SLA_Configuration, products: list[Product], *args, severities: list[str] | None = None, **kwargs):
def async_update_sla_expiration_dates_sla_config_sync(sla_config_id: int, product_ids: list[int], *args, severities: list[str] | None = None, **kwargs):
sla_config = SLA_Configuration.objects.filter(pk=sla_config_id).first()
if sla_config is None:
logger.info("SLA_Configuration with id %s no longer exists, skipping update", sla_config_id)
return
products = Product.objects.filter(pk__in=product_ids)
if method := get_custom_method("FINDING_SLA_EXPIRATION_CALCULATION_METHOD"):
method(sla_config, products, severities=severities)
else:
Expand Down
4 changes: 2 additions & 2 deletions dojo/survey/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import pickle
import json
from datetime import date, timedelta

from django.contrib import messages
Expand Down Expand Up @@ -490,7 +490,7 @@ def create_question(request):
order=form.cleaned_data["order"],
text=form.cleaned_data["text"],
multichoice=choiceQuestionFrom.cleaned_data["multichoice"])
choices_to_process = pickle.loads(choiceQuestionFrom.cleaned_data["answer_choices"])
choices_to_process = json.loads(choiceQuestionFrom.cleaned_data["answer_choices"])

for c in choices_to_process:
if c is not None and len(c) > 0:
Expand Down
15 changes: 13 additions & 2 deletions dojo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1737,10 +1737,13 @@ def _get_object_name(obj):


@app.task
def async_delete_task(obj, **kwargs):
def async_delete_task(model_label, pk, **kwargs):
"""
Delete an object and all its related objects using the SQL cascade walker.

Takes ``(model_label, pk)`` (e.g. ``("dojo.product", 42)``) so the task
arguments are JSON-serializable. The instance is refetched in the worker.

Handles Python-level concerns (duplicates, integrators, M2M, file cleanup,
product grading) explicitly, then uses cascade_delete_related_objects() for
efficient bottom-up SQL deletion of all FK-related tables. The top-level
Expand All @@ -1749,12 +1752,20 @@ def async_delete_task(obj, **kwargs):
Accepts **kwargs for _pgh_context injected by dojo_dispatch_task.
Uses PgHistoryTask base class (default) to preserve pghistory context for audit trail.
"""
from django.apps import apps # noqa: PLC0415

from dojo.finding.helper import ( # noqa: PLC0415 circular import
bulk_delete_findings,
prepare_duplicates_for_delete,
)
from dojo.utils_cascade_delete import cascade_delete_related_objects # noqa: PLC0415 circular import

Model = apps.get_model(model_label)
obj = Model.objects.filter(pk=pk).first()
if obj is None:
logger.info("ASYNC_DELETE: %s pk=%s already gone, nothing to do", model_label, pk)
return

logger.debug("ASYNC_DELETE: Deleting %s: %s", _get_object_name(obj), obj)
if not isinstance(obj, ASYNC_DELETE_SUPPORTED_TYPES):
logger.debug("ASYNC_DELETE: %s async delete not supported. Deleting normally: %s", _get_object_name(obj), obj)
Expand Down Expand Up @@ -1839,7 +1850,7 @@ def delete(self, obj, **kwargs):
"""
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import

dojo_dispatch_task(async_delete_task, obj, **kwargs)
dojo_dispatch_task(async_delete_task, obj._meta.label_lower, obj.pk, **kwargs)

@staticmethod
def get_object_name(obj):
Expand Down
Loading
Loading