Skip to content

Commit c1981f3

Browse files
Maffoochclauderossops
authored
Remove pickle from forms and Celery serializer (#14791)
* security: remove pickle from forms and Celery serializer Pickle deserialization on attacker-controllable bytes is arbitrary code execution. Two sites used pickle: - The survey choice-question form pickled/unpickled a list of strings through MultiExampleField.compress / MultiWidgetBasic.decompress and pickle.loads in survey/views.py. Switched to json — the data is just a list of up to 6 strings. - Celery defaulted to the pickle serializer with CELERY_ACCEPT_CONTENT including pickle/yaml/msgpack so dispatch sites could pass Django model instances and a Dojo_User on the wire. Made every task argument JSON-serializable: async_delete_task takes (model_label, pk) and refetches; SLA recompute takes (sla_config_id, product_ids); per-channel notification sends run inline inside the surrounding async_create_notification task instead of dispatching an inner Celery task with model instances; user context is injected as async_user_id and resolved in the worker. Flipped DD_CELERY_TASK_SERIALIZER default to json, tightened CELERY_ACCEPT_CONTENT to ["json"], and added CELERY_RESULT_SERIALIZER = "json". Added unittests/test_no_pickle.py as a regression net (asserts no pickle imports in dojo/ and that the Celery serializer settings stay JSON-only) and unittests/test_survey_forms.py for the widget round-trip. Operator note: workers running this version reject in-flight pickled messages with ContentDisallowed. Drain the broker (\`celery -A dojo purge -f\`) or scale workers to zero before deploy. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test: update importer perf-test query counts after pickle removal The async_user_id user resolution and refetch in async_delete_task / SLA recompute add 1-6 queries per scenario; CI auto-generated the new expected counts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(sla): reset in-memory async_updating after dispatch Before the pickle removal, the SLA recompute task received the SLA_Configuration / Product instances by reference (Celery's sync .apply() does not serialize). The inner update function set async_updating=False on those shared instances, so the dispatcher's local self.async_updating ended up False as well. After switching the dispatch to pass IDs and refetch in the task, the inner function only resets async_updating on its refetched copies. The dispatcher's in-memory self.async_updating stayed True, so a subsequent save() on the same instance triggered the lock-revert path at SLA_Configuration.save() line 1058 and overwrote the caller's field changes (e.g. enforce_critical) with the DB values. Manifested as test_sla_expiration_date_after_sla_not_enforced failing: sla_config.enforce_critical=False was reverted to True on save. Reset async_updating on the in-memory caller instances after dispatch returns to keep them consistent with the post-task DB state. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Adding try/except to channels --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Co-authored-by: Ross Esposito <rossespo@gmail.com>
1 parent 47b993c commit c1981f3

14 files changed

Lines changed: 170 additions & 73 deletions

File tree

dojo/celery.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,23 +28,26 @@ def __call__(self, *args, **kwargs):
2828
"""
2929
Restore user context in the celery worker via crum.impersonate.
3030
31-
The apply_async method injects ``async_user`` into kwargs when a task
32-
is dispatched. Here we pop it and set it as the current user in
33-
thread-local storage so that all downstream code — including nested
34-
dojo_dispatch_task calls — sees the correct user via
35-
get_current_user().
31+
The apply_async method injects ``async_user_id`` into kwargs when a task
32+
is dispatched. Here we pop it, resolve to a user instance, and set it
33+
as the current user in thread-local storage so that all downstream
34+
code — including nested dojo_dispatch_task calls — sees the correct
35+
user via get_current_user().
3636
37-
When a task is called directly (not via apply_async), async_user is
37+
When a task is called directly (not via apply_async), async_user_id is
3838
not in kwargs. In that case we leave the existing crum context
3939
intact so that callers who already set a user (e.g. via
4040
crum.impersonate in tests or request middleware) are not disrupted.
4141
"""
42-
if "async_user" not in kwargs:
42+
if "async_user_id" not in kwargs:
4343
return super().__call__(*args, **kwargs)
4444

4545
import crum # noqa: PLC0415
4646

47-
user = kwargs.pop("async_user")
47+
from dojo.models import Dojo_User # noqa: PLC0415 circular import
48+
49+
user_id = kwargs.pop("async_user_id")
50+
user = Dojo_User.objects.filter(pk=user_id).first() if user_id else None
4851
with crum.impersonate(user):
4952
return super().__call__(*args, **kwargs)
5053

@@ -59,8 +62,9 @@ def apply_async(self, args=None, kwargs=None, **options):
5962
# Inject user context for Dojo tasks only. Celery built-in tasks (e.g.
6063
# celery.backend_cleanup) do not accept custom kwargs.
6164
task_name = self.name or ""
62-
if not task_name.startswith("celery.") and "async_user" not in kwargs:
63-
kwargs["async_user"] = get_current_user()
65+
if not task_name.startswith("celery.") and "async_user_id" not in kwargs:
66+
user = get_current_user()
67+
kwargs["async_user_id"] = user.id if user else None
6468

6569
# Control flag used for sync/async decision; never pass into the task itself
6670
kwargs.pop("sync", None)
@@ -135,8 +139,6 @@ def __call__(self, *args, **kwargs):
135139

136140
app = Celery("dojo", task_cls=PgHistoryTask)
137141

138-
# Using a string here means the worker will not have to
139-
# pickle the object when using Windows.
140142
app.config_from_object("django.conf:settings", namespace="CELERY")
141143

142144
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

dojo/celery_dispatch.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ def apply_async(self, args: Any | None = None, kwargs: Any | None = None, **opti
1818

1919
def _inject_async_user(kwargs: Mapping[str, Any] | None) -> dict[str, Any]:
2020
result: dict[str, Any] = dict(kwargs or {})
21-
if "async_user" not in result:
21+
if "async_user_id" not in result:
2222
from dojo.utils import get_current_user # noqa: PLC0415 circular import
2323

24-
result["async_user"] = get_current_user()
24+
user = get_current_user()
25+
result["async_user_id"] = user.id if user else None
2526
return result
2627

2728

@@ -58,7 +59,7 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur
5859
"""
5960
Dispatch a task/signature using DefectDojo semantics.
6061
61-
- Inject `async_user` if missing.
62+
- Inject `async_user_id` if missing.
6263
- Capture and inject pghistory context if available.
6364
- Respect `sync=True` (foreground execution) and user `block_execution`.
6465
- Support `countdown=<seconds>` for async dispatch.

dojo/decorators.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def __wrapper__(*args, **kwargs):
8888
from dojo.utils import get_current_user # noqa: PLC0415 circular import
8989

9090
user = get_current_user()
91-
kwargs["async_user"] = user
91+
kwargs["async_user_id"] = user.id if user else None
9292

9393
# Capture pghistory context to pass to Celery worker
9494
# The PgHistoryTask base class will apply this context in the worker

dojo/forms.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
import json
12
import logging
2-
import pickle
33
import re
44
import warnings
55
from datetime import date, datetime
@@ -3477,7 +3477,7 @@ def __init__(self, attrs=None):
34773477

34783478
def decompress(self, value):
34793479
if value:
3480-
return pickle.loads(value)
3480+
return json.loads(value)
34813481
return [None, None, None, None, None, None]
34823482

34833483
def format_output(self, rendered_widgets):
@@ -3497,7 +3497,7 @@ def __init__(self, *args, **kwargs):
34973497
super().__init__(list_fields, *args, **kwargs)
34983498

34993499
def compress(self, values):
3500-
return pickle.dumps(values)
3500+
return json.dumps(values)
35013501

35023502

35033503
class CreateChoiceQuestionForm(forms.Form):

dojo/models.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,7 +1093,16 @@ def save(self, *args, **kwargs):
10931093
from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import
10941094
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
10951095

1096-
dojo_dispatch_task(async_update_sla_expiration_dates_sla_config_sync, self, products, severities=severities)
1096+
dojo_dispatch_task(
1097+
async_update_sla_expiration_dates_sla_config_sync,
1098+
self.id,
1099+
list(products.values_list("id", flat=True)),
1100+
severities=severities,
1101+
)
1102+
# The async task refetches and resets async_updating on its own copy.
1103+
# Mirror that on this in-memory instance so a subsequent save() on the
1104+
# same instance does not trigger the lock-revert path at line 1058.
1105+
self.async_updating = False
10971106

10981107
def clean(self):
10991108
sla_days = [self.critical, self.high, self.medium, self.low]
@@ -1255,7 +1264,17 @@ def save(self, *args, **kwargs):
12551264
from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import
12561265
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
12571266

1258-
dojo_dispatch_task(async_update_sla_expiration_dates_sla_config_sync, sla_config, Product.objects.filter(id=self.id))
1267+
dojo_dispatch_task(
1268+
async_update_sla_expiration_dates_sla_config_sync,
1269+
sla_config.id,
1270+
[self.id],
1271+
)
1272+
# The async task refetches and resets async_updating on its own copies.
1273+
# Mirror that on this in-memory product and the in-memory sla_config so a
1274+
# subsequent save() on either does not trigger their lock-revert paths.
1275+
self.async_updating = False
1276+
if sla_config:
1277+
sla_config.async_updating = False
12591278

12601279
def get_absolute_url(self):
12611280
return reverse("view_product", args=[str(self.id)])

dojo/notifications/helper.py

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
from dojo import __version__ as dd_version
2020
from dojo.authorization.roles_permissions import Permissions
21-
from dojo.celery_dispatch import dojo_dispatch_task
2221
from dojo.decorators import we_want_async
2322
from dojo.labels import get_labels
2423
from dojo.models import (
@@ -834,58 +833,55 @@ def _process_notifications(
834833

835834
# Some errors should not be pushed to all channels, only to alerts.
836835
# For example reasons why JIRA Issues: https://github.com/DefectDojo/django-DefectDojo/issues/11575
836+
# Per-channel sends run synchronously inside the surrounding async_create_notification
837+
# task body. Dispatching inner Celery tasks would require JSON-serializable kwargs, but
838+
# callers pass model instances (finding/test/engagement/product/...) and refetching every
839+
# one of them per channel would multiply DB queries; running synchronously avoids both.
837840
if not alert_only:
841+
user_id = getattr(notifications.user, "id", None)
838842
if self.system_settings.enable_slack_notifications and "slack" in getattr(
839843
notifications,
840844
event,
841845
notifications.other,
842846
):
843847
logger.debug("Sending Slack Notification")
844-
dojo_dispatch_task(
845-
send_slack_notification,
846-
event,
847-
user_id=getattr(notifications.user, "id", None),
848-
**kwargs,
849-
)
848+
try:
849+
send_slack_notification.run(event, user_id=user_id, **kwargs)
850+
except Exception:
851+
logger.exception("Failed to send Slack notification for event %s", event)
850852

851853
if self.system_settings.enable_msteams_notifications and "msteams" in getattr(
852854
notifications,
853855
event,
854856
notifications.other,
855857
):
856858
logger.debug("Sending MSTeams Notification")
857-
dojo_dispatch_task(
858-
send_msteams_notification,
859-
event,
860-
user_id=getattr(notifications.user, "id", None),
861-
**kwargs,
862-
)
859+
try:
860+
send_msteams_notification.run(event, user_id=user_id, **kwargs)
861+
except Exception:
862+
logger.exception("Failed to send MSTeams notification for event %s", event)
863863

864864
if self.system_settings.enable_mail_notifications and "mail" in getattr(
865865
notifications,
866866
event,
867867
notifications.other,
868868
):
869869
logger.debug("Sending Mail Notification")
870-
dojo_dispatch_task(
871-
send_mail_notification,
872-
event,
873-
user_id=getattr(notifications.user, "id", None),
874-
**kwargs,
875-
)
870+
try:
871+
send_mail_notification.run(event, user_id=user_id, **kwargs)
872+
except Exception:
873+
logger.exception("Failed to send Mail notification for event %s", event)
876874

877875
if self.system_settings.enable_webhooks_notifications and "webhooks" in getattr(
878876
notifications,
879877
event,
880878
notifications.other,
881879
):
882880
logger.debug("Sending Webhooks Notification")
883-
dojo_dispatch_task(
884-
send_webhooks_notification,
885-
event,
886-
user_id=getattr(notifications.user, "id", None),
887-
**kwargs,
888-
)
881+
try:
882+
send_webhooks_notification.run(event, user_id=user_id, **kwargs)
883+
except Exception:
884+
logger.exception("Failed to send Webhooks notification for event %s", event)
889885

890886

891887
def process_tag_notifications(request, note, parent_url, parent_title):

dojo/notifications/tasks.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ def log_generic_alert(source, title, description):
3838

3939

4040
@app.task(bind=True)
41-
def add_alerts(self, runinterval, *args, **kwargs):
41+
def add_alerts(self, *args, **kwargs):
42+
# Run interval matches the beat schedule for this task (see CELERY_BEAT_SCHEDULE).
43+
runinterval = timedelta(hours=1)
4244
now = timezone.now()
4345

4446
upcoming_engagements = Engagement.objects.filter(target_start__gt=now + timedelta(days=3), target_start__lt=now + timedelta(days=3) + runinterval).order_by("target_start")

dojo/settings/settings.dist.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
DD_CELERY_RESULT_BACKEND=(str, "django-db"),
108108
DD_CELERY_RESULT_EXPIRES=(int, 86400),
109109
DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")),
110-
DD_CELERY_TASK_SERIALIZER=(str, "pickle"),
110+
DD_CELERY_TASK_SERIALIZER=(str, "json"),
111111
DD_CELERY_LOG_LEVEL=(str, "INFO"),
112112
# Hard ceiling on task runtime. When reached, the worker process is sent SIGKILL — no cleanup
113113
# code runs. Always set higher than DD_CELERY_TASK_SOFT_TIME_LIMIT. (0 = disabled, no limit)
@@ -847,8 +847,9 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
847847
CELERY_TIMEZONE = TIME_ZONE
848848
CELERY_RESULT_EXPIRES = env("DD_CELERY_RESULT_EXPIRES")
849849
CELERY_BEAT_SCHEDULE_FILENAME = env("DD_CELERY_BEAT_SCHEDULE_FILENAME")
850-
CELERY_ACCEPT_CONTENT = ["pickle", "json", "msgpack", "yaml"]
850+
CELERY_ACCEPT_CONTENT = ["json"]
851851
CELERY_TASK_SERIALIZER = env("DD_CELERY_TASK_SERIALIZER")
852+
CELERY_RESULT_SERIALIZER = "json"
852853
CELERY_LOG_LEVEL = env("DD_CELERY_LOG_LEVEL")
853854

854855
if env("DD_CELERY_TASK_TIME_LIMIT") > 0:
@@ -872,7 +873,6 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
872873
"add-alerts": {
873874
"task": "dojo.notifications.tasks.add_alerts",
874875
"schedule": timedelta(hours=1),
875-
"args": [timedelta(hours=1)],
876876
"options": {
877877
"expires": int(60 * 60 * 1 * 1.2), # If a task is not executed within 72 minutes, it should be dropped from the queue. Two more tasks should be scheduled in the meantime.
878878
},

dojo/sla_config/helpers.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88

99

1010
@app.task
11-
def async_update_sla_expiration_dates_sla_config_sync(sla_config: SLA_Configuration, products: list[Product], *args, severities: list[str] | None = None, **kwargs):
11+
def async_update_sla_expiration_dates_sla_config_sync(sla_config_id: int, product_ids: list[int], *args, severities: list[str] | None = None, **kwargs):
12+
sla_config = SLA_Configuration.objects.filter(pk=sla_config_id).first()
13+
if sla_config is None:
14+
logger.info("SLA_Configuration with id %s no longer exists, skipping update", sla_config_id)
15+
return
16+
products = Product.objects.filter(pk__in=product_ids)
1217
if method := get_custom_method("FINDING_SLA_EXPIRATION_CALCULATION_METHOD"):
1318
method(sla_config, products, severities=severities)
1419
else:

dojo/survey/views.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import pickle
1+
import json
22
from datetime import date, timedelta
33

44
from django.contrib import messages
@@ -490,7 +490,7 @@ def create_question(request):
490490
order=form.cleaned_data["order"],
491491
text=form.cleaned_data["text"],
492492
multichoice=choiceQuestionFrom.cleaned_data["multichoice"])
493-
choices_to_process = pickle.loads(choiceQuestionFrom.cleaned_data["answer_choices"])
493+
choices_to_process = json.loads(choiceQuestionFrom.cleaned_data["answer_choices"])
494494

495495
for c in choices_to_process:
496496
if c is not None and len(c) > 0:

0 commit comments

Comments
 (0)