Skip to content

Commit 0ffcacc

Browse files
valentijnscholtenValentijn Scholten
andauthored
pghistory: add context for each process and celery tasks (#13988)
* pghistory: add context for each process * pghistory: pass on context to celery tasks * support vue as source --------- Co-authored-by: Valentijn Scholten <valentijn.scholten@iodigital.com>
1 parent c35e8fa commit 0ffcacc

14 files changed

Lines changed: 350 additions & 115 deletions

File tree

dojo/api_v2/views.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import datetime
55
from pathlib import Path
66

7+
import pghistory
78
import tagulous
89
from crum import get_current_user
910
from dateutil.relativedelta import relativedelta
@@ -2530,7 +2531,17 @@ def perform_create(self, serializer):
25302531
if jira_project := (jira_helper.get_jira_project(jira_driver) if jira_driver else None):
25312532
push_to_jira = push_to_jira or jira_project.push_all_issues
25322533

2534+
# Add pghistory context for audit trail (adds to existing middleware context).
2535+
# /api/vue is the Pro UI
2536+
source = "import_vue" if "/api/vue/" in self.request.path else "import_api"
2537+
pghistory.context(
2538+
source=source,
2539+
scan_type=serializer.validated_data.get("scan_type"),
2540+
)
25332541
serializer.save(push_to_jira=push_to_jira)
2542+
# Add test_id to pghistory context now that test is created
2543+
if test_id := serializer.data.get("test"):
2544+
pghistory.context(test_id=test_id)
25342545

25352546
def get_queryset(self):
25362547
return get_authorized_tests(Permissions.Import_Scan_Result)
@@ -2678,7 +2689,22 @@ def perform_create(self, serializer):
26782689
if jira_project := (jira_helper.get_jira_project(jira_driver) if jira_driver else None):
26792690
push_to_jira = push_to_jira or jira_project.push_all_issues
26802691
logger.debug("push_to_jira: %s", push_to_jira)
2692+
# Add pghistory context for audit trail (adds to existing middleware context)
2693+
# For reimport, test may already exist or be created during save
2694+
test_id = test.id if test else serializer.validated_data.get("test", {})
2695+
if hasattr(test_id, "id"):
2696+
test_id = test_id.id
2697+
# /api/vue is the Pro UI
2698+
source = "reimport_vue" if "/api/vue/" in self.request.path else "reimport_api"
2699+
pghistory.context(
2700+
source=source,
2701+
test_id=test_id if isinstance(test_id, int) else None,
2702+
scan_type=serializer.validated_data.get("scan_type"),
2703+
)
26812704
serializer.save(push_to_jira=push_to_jira)
2705+
# Update test_id if it wasn't available before save
2706+
if test_id_from_response := serializer.data.get("test"):
2707+
pghistory.context(test_id=test_id_from_response)
26822708

26832709

26842710
# Authorization: configuration

dojo/celery.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33
from logging.config import dictConfig
44

5-
from celery import Celery
5+
from celery import Celery, Task
66
from celery.signals import setup_logging
77
from django.conf import settings
88

@@ -11,7 +11,31 @@
1111
# set the default Django settings module for the 'celery' program.
1212
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "dojo.settings.settings")
1313

14-
app = Celery("dojo")
14+
15+
class PgHistoryTask(Task):
16+
17+
"""
18+
Custom Celery base task that automatically applies pghistory context.
19+
20+
When a task is dispatched via dojo_async_task, the current pghistory
21+
context is captured and passed in kwargs as "_pgh_context". This base
22+
class extracts that context and applies it before running the task,
23+
ensuring all database events share the same context as the original
24+
request.
25+
"""
26+
27+
def __call__(self, *args, **kwargs):
28+
# Import here to avoid circular imports during Celery startup
29+
from dojo.pghistory_utils import get_pghistory_context_manager # noqa: PLC0415
30+
31+
# Extract context from kwargs (won't be passed to task function)
32+
pgh_context = kwargs.pop("_pgh_context", None)
33+
34+
with get_pghistory_context_manager(pgh_context):
35+
return super().__call__(*args, **kwargs)
36+
37+
38+
app = Celery("dojo", task_cls=PgHistoryTask)
1539

1640
# Using a string here means the worker will not have to
1741
# pickle the object when using Windows.

dojo/decorators.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,17 @@ def dojo_async_task(func=None, *, signature=False):
8383
def decorator(func):
8484
@wraps(func)
8585
def __wrapper__(*args, **kwargs):
86+
from dojo.pghistory_utils import get_serializable_pghistory_context # noqa: PLC0415 circular import
8687
from dojo.utils import get_current_user # noqa: PLC0415 circular import
88+
8789
user = get_current_user()
8890
kwargs["async_user"] = user
8991

92+
# Capture pghistory context to pass to Celery worker
93+
# The PgHistoryTask base class will apply this context in the worker
94+
if pgh_context := get_serializable_pghistory_context():
95+
kwargs["_pgh_context"] = pgh_context
96+
9097
dojo_async_task_counter.incr(
9198
func.__name__,
9299
args=args,

dojo/engagement/views.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from tempfile import NamedTemporaryFile
1111
from time import strftime
1212

13+
import pghistory
1314
from django.conf import settings
1415
from django.contrib import messages
1516
from django.contrib.admin.utils import NestedObjects
@@ -1138,10 +1139,18 @@ def post(
11381139
if form_error := self.process_form(request, context.get("form"), context):
11391140
add_error_message_to_response(form_error)
11401141
return self.failure_redirect(request, context)
1142+
# Add pghistory context for audit trail (adds to existing middleware context)
1143+
pghistory.context(
1144+
source="import",
1145+
scan_type=context.get("scan_type"),
1146+
)
11411147
# Kick off the import process
11421148
if import_error := self.import_findings(context):
11431149
add_error_message_to_response(import_error)
11441150
return self.failure_redirect(request, context)
1151+
# Add test_id to pghistory context now that test is created
1152+
if test := context.get("test"):
1153+
pghistory.context(test_id=test.id)
11451154
# Process the credential form
11461155
if form_error := self.process_credentials_form(request, context.get("cred_form"), context):
11471156
add_error_message_to_response(form_error)

dojo/finding/views.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from itertools import chain
1010
from pathlib import Path
1111

12+
import pghistory
1213
from django.conf import settings
1314
from django.contrib import messages
1415
from django.core import serializers
@@ -2557,6 +2558,11 @@ def finding_bulk_update_all(request, pid=None):
25572558
logger.debug("bulk 20")
25582559

25592560
finding_to_update = request.POST.getlist("finding_to_update")
2561+
# Add pghistory context for audit trail (adds to existing middleware context)
2562+
pghistory.context(
2563+
source="bulk_edit",
2564+
finding_count=len(finding_to_update),
2565+
)
25602566
finds = Finding.objects.filter(id__in=finding_to_update).order_by("id")
25612567
total_find_count = finds.count()
25622568
prods = set(find.test.engagement.product for find in finds) # noqa: C401

dojo/jira_link/views.py

Lines changed: 89 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import re
66

77
# Third party imports
8+
import pghistory
89
from django.contrib import messages
910
from django.contrib.admin.utils import NestedObjects
1011
from django.core.exceptions import PermissionDenied
@@ -85,85 +86,98 @@ def webhook(request, secret=None):
8586
if request.content_type != "application/json":
8687
return webhook_responser_handler("debug", "only application/json supported")
8788
# Time to process the request
89+
# Parse the JSON first to get webhook event type for context
8890
try:
8991
parsed = json.loads(request.body.decode("utf-8"))
90-
# Check if the events supplied are supported
91-
if parsed.get("webhookEvent") not in {"comment_created", "jira:issue_updated"}:
92-
return webhook_responser_handler("info", f"Unrecognized JIRA webhook event received: {parsed.get('webhookEvent')}")
93-
94-
if parsed.get("webhookEvent") == "jira:issue_updated":
95-
# xml examples at the end of file
96-
jid = parsed["issue"]["id"]
97-
# This may raise a 404, but it will be handled in the exception response
98-
try:
99-
jissue = JIRA_Issue.objects.get(jira_id=jid)
100-
except JIRA_Instance.DoesNotExist:
101-
return webhook_responser_handler("info", f"JIRA issue {jid} is not linked to a DefectDojo Finding")
102-
findings = None
103-
# Determine what type of object we will be working with
104-
if jissue.finding:
105-
logger.debug(f"Received issue update for {jissue.jira_key} for finding {jissue.finding.id}")
106-
findings = [jissue.finding]
107-
elif jissue.finding_group:
108-
logger.debug(f"Received issue update for {jissue.jira_key} for finding group {jissue.finding_group}")
109-
findings = jissue.finding_group.findings.all()
110-
elif jissue.engagement:
111-
return webhook_responser_handler("debug", "Update for engagement ignored")
112-
else:
113-
return webhook_responser_handler("info", f"Received issue update for {jissue.jira_key} for unknown object")
114-
# Process the assignee if present
115-
assignee = parsed["issue"]["fields"].get("assignee")
116-
assignee_name = "Jira User"
117-
if assignee is not None:
118-
# First look for the 'name' field. If not present, try 'displayName'. Else put None
119-
assignee_name = assignee.get("name", assignee.get("displayName"))
120-
121-
# "resolution":{
122-
# "self":"http://www.testjira.com/rest/api/2/resolution/11",
123-
# "id":"11",
124-
# "description":"Cancelled by the customer.",
125-
# "name":"Cancelled"
126-
# },
127-
128-
# or
129-
# "resolution": null
130-
131-
# or
132-
# "resolution": "None"
133-
134-
resolution = parsed["issue"]["fields"]["resolution"]
135-
resolution = resolution if resolution and resolution != "None" else None
136-
resolution_id = resolution["id"] if resolution else None
137-
resolution_name = resolution["name"] if resolution else None
138-
jira_now = parse_datetime(parsed["issue"]["fields"]["updated"])
139-
140-
if findings:
141-
for finding in findings:
142-
jira_helper.process_resolution_from_jira(finding, resolution_id, resolution_name, assignee_name, jira_now, jissue, finding_group=jissue.finding_group)
143-
# Check for any comment that could have come along with the resolution
144-
if (error_response := check_for_and_create_comment(parsed)) is not None:
145-
return error_response
146-
147-
if parsed.get("webhookEvent") == "comment_created":
148-
if (error_response := check_for_and_create_comment(parsed)) is not None:
149-
return error_response
150-
15192
except Exception as e:
152-
# Check if the issue is originally a 404
153-
if isinstance(e, Http404):
154-
return webhook_responser_handler("debug", str(e))
155-
# Try to get a little more information on the exact exception
93+
return webhook_responser_handler("debug", f"Failed to parse JSON: {e}")
94+
95+
# Check if the events supplied are supported
96+
if parsed.get("webhookEvent") not in {"comment_created", "jira:issue_updated"}:
97+
return webhook_responser_handler("info", f"Unrecognized JIRA webhook event received: {parsed.get('webhookEvent')}")
98+
99+
# Wrap processing with pghistory context for audit trail
100+
# JIRA webhooks don't have a user session, so we create a new context
101+
with pghistory.context(
102+
source="jira_webhook",
103+
jira_event=parsed.get("webhookEvent"),
104+
):
156105
try:
157-
message = (
158-
f"Original Exception: {e}\n"
159-
f"jira webhook body parsed:\n{json.dumps(parsed, indent=4)}"
160-
)
161-
except Exception:
162-
message = (
163-
f"Original Exception: {e}\n"
164-
f"jira webhook body :\n{request.body.decode('utf-8')}"
165-
)
166-
return webhook_responser_handler("debug", message)
106+
if parsed.get("webhookEvent") == "jira:issue_updated":
107+
# xml examples at the end of file
108+
jid = parsed["issue"]["id"]
109+
# This may raise a 404, but it will be handled in the exception response
110+
try:
111+
jissue = JIRA_Issue.objects.get(jira_id=jid)
112+
except JIRA_Instance.DoesNotExist:
113+
return webhook_responser_handler("info", f"JIRA issue {jid} is not linked to a DefectDojo Finding")
114+
# Add jira_key to context now that we have it
115+
pghistory.context(jira_key=jissue.jira_key)
116+
findings = None
117+
# Determine what type of object we will be working with
118+
if jissue.finding:
119+
logger.debug(f"Received issue update for {jissue.jira_key} for finding {jissue.finding.id}")
120+
findings = [jissue.finding]
121+
elif jissue.finding_group:
122+
logger.debug(f"Received issue update for {jissue.jira_key} for finding group {jissue.finding_group}")
123+
findings = jissue.finding_group.findings.all()
124+
elif jissue.engagement:
125+
return webhook_responser_handler("debug", "Update for engagement ignored")
126+
else:
127+
return webhook_responser_handler("info", f"Received issue update for {jissue.jira_key} for unknown object")
128+
# Process the assignee if present
129+
assignee = parsed["issue"]["fields"].get("assignee")
130+
assignee_name = "Jira User"
131+
if assignee is not None:
132+
# First look for the 'name' field. If not present, try 'displayName'. Else put None
133+
assignee_name = assignee.get("name", assignee.get("displayName"))
134+
135+
# "resolution":{
136+
# "self":"http://www.testjira.com/rest/api/2/resolution/11",
137+
# "id":"11",
138+
# "description":"Cancelled by the customer.",
139+
# "name":"Cancelled"
140+
# },
141+
142+
# or
143+
# "resolution": null
144+
145+
# or
146+
# "resolution": "None"
147+
148+
resolution = parsed["issue"]["fields"]["resolution"]
149+
resolution = resolution if resolution and resolution != "None" else None
150+
resolution_id = resolution["id"] if resolution else None
151+
resolution_name = resolution["name"] if resolution else None
152+
jira_now = parse_datetime(parsed["issue"]["fields"]["updated"])
153+
154+
if findings:
155+
for finding in findings:
156+
jira_helper.process_resolution_from_jira(finding, resolution_id, resolution_name, assignee_name, jira_now, jissue, finding_group=jissue.finding_group)
157+
# Check for any comment that could have come along with the resolution
158+
if (error_response := check_for_and_create_comment(parsed)) is not None:
159+
return error_response
160+
161+
if parsed.get("webhookEvent") == "comment_created":
162+
if (error_response := check_for_and_create_comment(parsed)) is not None:
163+
return error_response
164+
165+
except Exception as e:
166+
# Check if the issue is originally a 404
167+
if isinstance(e, Http404):
168+
return webhook_responser_handler("debug", str(e))
169+
# Try to get a little more information on the exact exception
170+
try:
171+
message = (
172+
f"Original Exception: {e}\n"
173+
f"jira webhook body parsed:\n{json.dumps(parsed, indent=4)}"
174+
)
175+
except Exception:
176+
message = (
177+
f"Original Exception: {e}\n"
178+
f"jira webhook body :\n{request.body.decode('utf-8')}"
179+
)
180+
return webhook_responser_handler("debug", message)
167181

168182
return webhook_responser_handler("No logging here", "Success!")
169183

dojo/management/commands/dedupe.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22

3+
import pghistory
34
from django.conf import settings
45
from django.core.management.base import BaseCommand
56
from django.db.models import Prefetch
@@ -64,6 +65,21 @@ def handle(self, *args, **options):
6465
dedupe_sync = options["dedupe_sync"]
6566
dedupe_batch_mode = options.get("dedupe_batch_mode", True) # Default to True (batch mode enabled)
6667

68+
# Wrap with pghistory context for audit trail
69+
with pghistory.context(
70+
source="dedupe_command",
71+
dedupe_sync=dedupe_sync,
72+
):
73+
self._run_dedupe(
74+
restrict_to_parsers=restrict_to_parsers,
75+
hash_code_only=hash_code_only,
76+
dedupe_only=dedupe_only,
77+
dedupe_sync=dedupe_sync,
78+
dedupe_batch_mode=dedupe_batch_mode,
79+
)
80+
81+
def _run_dedupe(self, *, restrict_to_parsers, hash_code_only, dedupe_only, dedupe_sync, dedupe_batch_mode):
82+
"""Internal method to run the dedupe logic within pghistory context."""
6783
if restrict_to_parsers is not None:
6884
findings = Finding.objects.filter(test__test_type__name__in=restrict_to_parsers).exclude(duplicate=True)
6985
logger.info("######## Will process only parsers %s and %d findings ########", *restrict_to_parsers, findings.count())

dojo/management/commands/jira_status_reconciliation.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22

3+
import pghistory
34
from dateutil.relativedelta import relativedelta
45
from django.conf import settings
56
from django.core.management.base import BaseCommand
@@ -216,10 +217,9 @@ def add_arguments(self, parser):
216217
parser.add_argument("--dryrun", action="store_true", help="Only print actions to be performed, but make no modifications.")
217218

218219
def handle(self, *args, **options):
219-
# mode = options['mode']
220-
# product = options['product']
221-
# engagement = options['engagement']
222-
# daysback = options['daysback']
223-
# dryrun = options['dryrun']
224-
225-
return jira_status_reconciliation(*args, **options)
220+
# Wrap with pghistory context for audit trail
221+
with pghistory.context(
222+
source="jira_reconciliation",
223+
mode=options.get("mode", "reconcile"),
224+
):
225+
return jira_status_reconciliation(*args, **options)

0 commit comments

Comments
 (0)