-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Expand file tree
/
Copy pathviews.py
More file actions
3395 lines (2979 loc) · 139 KB
/
views.py
File metadata and controls
3395 lines (2979 loc) · 139 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# # findings
import base64
import contextlib
import copy
import logging
import mimetypes
from collections import OrderedDict, defaultdict
from itertools import chain
from pathlib import Path
import pghistory
from django.conf import settings
from django.contrib import messages
from django.core import serializers
from django.core.exceptions import PermissionDenied, ValidationError
from django.db import models
from django.db.models import Case, F, QuerySet, Value, When
from django.db.models.functions import Coalesce, ExtractDay, Length, TruncDate
from django.db.models.query import Prefetch
from django.http import Http404, HttpRequest, HttpResponse, HttpResponseRedirect, JsonResponse, StreamingHttpResponse
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils import timezone
from django.utils.safestring import mark_safe
from django.utils.translation import gettext as _
from django.views import View
from django.views.decorators.http import require_POST
from imagekit import ImageSpec
from imagekit.processors import ResizeToFill
import dojo.finding.helper as finding_helper
import dojo.risk_acceptance.helper as ra_helper
from dojo.authorization.authorization import user_has_global_permission_or_403, user_has_permission_or_403
from dojo.celery_dispatch import dojo_dispatch_task
from dojo.filters import (
AcceptedFindingFilter,
AcceptedFindingFilterWithoutObjectLookups,
FindingFilter,
FindingFilterWithoutObjectLookups,
SimilarFindingFilter,
SimilarFindingFilterWithoutObjectLookups,
TemplateFindingFilter,
TestImportFilter,
TestImportFindingActionFilter,
)
from dojo.finding.deduplication import (
_fetch_fp_candidates_for_batch,
do_false_positive_history_batch,
match_finding_to_existing_findings,
)
from dojo.finding.queries import get_authorized_findings, get_authorized_findings_for_queryset, prefetch_for_findings
from dojo.forms import (
ApplyFindingTemplateForm,
ClearFindingReviewForm,
CloseFindingForm,
CopyFindingForm,
DefectFindingForm,
DeleteFindingForm,
DeleteFindingTemplateForm,
EditPlannedRemediationDateFindingForm,
FindingBulkUpdateForm,
FindingForm,
FindingTemplateForm,
GITHUBFindingForm,
JIRAFindingForm,
MergeFindings,
NoteForm,
ReviewFindingForm,
TypedNoteForm,
)
from dojo.jira import services as jira_services
from dojo.location.status import FindingLocationStatus
from dojo.models import (
IMPORT_UNTOUCHED_FINDING,
BurpRawRequestResponse,
Dojo_User,
Endpoint,
Endpoint_Status,
Engagement,
FileAccessToken,
Finding,
Finding_Group,
Finding_Template,
GITHUB_Issue,
GITHUB_PKey,
Note_Type,
NoteHistory,
Notes,
Product,
System_Settings,
Test,
Test_Import,
Test_Import_Finding_Action,
User,
)
from dojo.notifications.helper import create_notification
from dojo.tags.utils import bulk_add_tags_to_instances
from dojo.test.queries import get_authorized_tests
from dojo.tools import tool_issue_updater
from dojo.utils import (
FileIterWrapper,
Product_Tab,
add_breadcrumb,
add_error_message_to_response,
add_external_issue,
add_field_errors_to_response,
add_success_message_to_response,
calculate_grade,
get_page_items,
get_page_items_and_count,
get_return_url,
get_system_setting,
get_visible_scan_types,
get_words_for_field,
process_tag_notifications,
redirect,
redirect_to_return_url_or_else,
reopen_external_issue,
update_external_issue,
)
JFORM_PUSH_TO_JIRA_MESSAGE = "jform.push_to_jira: %s"
logger = logging.getLogger(__name__)
def prefetch_for_similar_findings(findings):
prefetched_findings = findings
if isinstance(
findings, QuerySet,
): # old code can arrive here with prods being a list because the query was already executed
prefetched_findings = prefetched_findings.prefetch_related("reporter")
prefetched_findings = prefetched_findings.prefetch_related(
"jira_issue__jira_project__jira_instance",
)
prefetched_findings = prefetched_findings.prefetch_related("test__test_type")
prefetched_findings = prefetched_findings.prefetch_related(
"test__engagement__jira_project__jira_instance",
)
prefetched_findings = prefetched_findings.prefetch_related(
"test__engagement__product__jira_project_set__jira_instance",
)
prefetched_findings = prefetched_findings.prefetch_related("found_by")
prefetched_findings = prefetched_findings.prefetch_related(
"risk_acceptance_set",
)
prefetched_findings = prefetched_findings.prefetch_related(
"risk_acceptance_set__accepted_findings",
)
prefetched_findings = prefetched_findings.prefetch_related("original_finding")
prefetched_findings = prefetched_findings.prefetch_related("duplicate_finding")
# filter out noop reimport actions from finding status history
prefetched_findings = prefetched_findings.prefetch_related(
Prefetch(
"test_import_finding_action_set",
queryset=Test_Import_Finding_Action.objects.exclude(
action=IMPORT_UNTOUCHED_FINDING,
),
),
)
"""
we could try to prefetch only the latest note with SubQuery and OuterRef,
but I'm getting that MySql doesn't support limits in subqueries.
"""
prefetched_findings = prefetched_findings.prefetch_related("notes")
prefetched_findings = prefetched_findings.prefetch_related("tags")
prefetched_findings = prefetched_findings.prefetch_related(
"vulnerability_id_set",
)
else:
logger.debug("unable to prefetch because query was already executed")
return prefetched_findings
class BaseListFindings:
def __init__(
self,
filter_name: str = "All",
product_id: int | None = None,
engagement_id: int | None = None,
test_id: int | None = None,
order_by: str = "numerical_severity",
prefetch_type: str = "all",
):
self.filter_name = filter_name
self.product_id = product_id
self.engagement_id = engagement_id
self.test_id = test_id
self.order_by = order_by
self.prefetch_type = prefetch_type
def get_filter_name(self):
if not hasattr(self, "filter_name"):
self.filter_name = "All"
return self.filter_name
def get_order_by(self):
if not hasattr(self, "order_by"):
self.order_by = "numerical_severity"
return self.order_by
def get_prefetch_type(self):
if not hasattr(self, "prefetch_type"):
self.prefetch_type = "all"
return self.prefetch_type
def get_product_id(self):
if not hasattr(self, "product_id"):
self.product_id = None
return self.product_id
def get_engagement_id(self):
if not hasattr(self, "engagement_id"):
self.engagement_id = None
return self.engagement_id
def get_test_id(self):
if not hasattr(self, "test_id"):
self.test_id = None
return self.test_id
def filter_findings_by_object(self, findings: QuerySet[Finding]):
if product_id := self.get_product_id():
return findings.filter(test__engagement__product__id=product_id)
if engagement_id := self.get_engagement_id():
return findings.filter(test__engagement=engagement_id)
if test_id := self.get_test_id():
return findings.filter(test=test_id)
return findings
def filter_findings_by_filter_name(self, findings: QuerySet[Finding]):
filter_name = self.get_filter_name()
if filter_name == "Open":
return findings.filter(finding_helper.OPEN_FINDINGS_QUERY)
if filter_name == "Verified":
return findings.filter(finding_helper.VERIFIED_FINDINGS_QUERY)
if filter_name == "Out of Scope":
return findings.filter(finding_helper.OUT_OF_SCOPE_FINDINGS_QUERY)
if filter_name == "False Positive":
return findings.filter(finding_helper.FALSE_POSITIVE_FINDINGS_QUERY)
if filter_name == "Inactive":
return findings.filter(finding_helper.INACTIVE_FINDINGS_QUERY)
if filter_name == "Accepted":
return findings.filter(finding_helper.ACCEPTED_FINDINGS_QUERY)
if filter_name == "Closed":
return findings.filter(finding_helper.CLOSED_FINDINGS_QUERY)
return findings
def filter_findings_by_form(self, request: HttpRequest, findings: QuerySet[Finding]):
# Apply default ordering if no ordering parameter is provided
# This maintains backward compatibility with the previous behavior
if not request.GET.get("o"):
findings = findings.order_by(self.get_order_by())
# Set up the args for the form
args = [request.GET, findings]
# Set the initial form args
kwargs = {
"user": request.user,
"pid": self.get_product_id(),
"eid": self.get_engagement_id(),
"tid": self.get_test_id(),
}
filter_string_matching = get_system_setting("filter_string_matching", False)
finding_filter_class = FindingFilterWithoutObjectLookups if filter_string_matching else FindingFilter
accepted_finding_filter_class = AcceptedFindingFilterWithoutObjectLookups if filter_string_matching else AcceptedFindingFilter
return (
accepted_finding_filter_class(*args, **kwargs)
if self.get_filter_name() == "Accepted"
else finding_filter_class(*args, **kwargs)
)
def get_filtered_findings(self):
findings = get_authorized_findings("view")
# Annotate computed SLA age in days: sla_expiration_date - (sla_start_date or date)
# Handle NULL sla_expiration_date by using Coalesce to provide a large default value
# so NULLs sort last when sorting ascending (most urgent first)
findings = findings.annotate(
sla_age_days=Coalesce(
ExtractDay(
F("sla_expiration_date") - Coalesce(F("sla_start_date"), TruncDate("created")),
),
Value(999999), # Large value to push NULLs to the end when sorting ascending
output_field=models.IntegerField(),
),
)
# Don't apply initial order_by here - let OrderingFilter handle it via request.GET['o']
# This prevents conflicts between initial ordering and user-requested sorting
findings = self.filter_findings_by_object(findings)
return self.filter_findings_by_filter_name(findings)
def get_fully_filtered_findings(self, request: HttpRequest):
findings = self.get_filtered_findings()
return self.filter_findings_by_form(request, findings)
class ListFindings(View, BaseListFindings):
def get_initial_context(self, request: HttpRequest):
context = {
"filter_name": self.get_filter_name(),
"show_product_column": True,
"custom_breadcrumb": None,
"product_tab": None,
"jira_project": None,
"github_config": None,
"bulk_edit_form": FindingBulkUpdateForm(request.GET),
"enable_table_filtering": get_system_setting("enable_ui_table_based_searching"),
"title_words": get_words_for_field(Finding, "title"),
"component_words": get_words_for_field(Finding, "component_name"),
"visible_test_types": get_visible_scan_types(),
}
# Look to see if the product was used
if product_id := self.get_product_id():
product = get_object_or_404(Product, id=product_id)
user_has_permission_or_403(request.user, product, "view")
context["show_product_column"] = False
context["product_tab"] = Product_Tab(product, title="Findings", tab="findings")
context["jira_project"] = jira_services.get_project(product)
if github_config := GITHUB_PKey.objects.filter(product=product).first():
context["github_config"] = github_config.git_conf_id
elif engagement_id := self.get_engagement_id():
engagement = get_object_or_404(Engagement, id=engagement_id)
user_has_permission_or_403(request.user, engagement, "view")
context["show_product_column"] = False
context["product_tab"] = Product_Tab(engagement.product, title=engagement.name, tab="engagements")
context["jira_project"] = jira_services.get_project(engagement)
if github_config := GITHUB_PKey.objects.filter(product__engagement=engagement).first():
context["github_config"] = github_config.git_conf_id
return request, context
def get_template(self):
return "dojo/findings_list.html"
def add_breadcrumbs(self, request: HttpRequest, context: dict):
# show custom breadcrumb if user has filtered by exactly 1 endpoint
if "endpoints" in request.GET:
endpoint_ids = request.GET.getlist("endpoints", [])
if len(endpoint_ids) == 1 and endpoint_ids[0]:
endpoint_id = endpoint_ids[0]
endpoint = get_object_or_404(Endpoint, id=endpoint_id)
context["filter_name"] = "Vulnerable Endpoints"
context["custom_breadcrumb"] = OrderedDict(
[
("Endpoints", reverse("vulnerable_endpoints")),
(endpoint, reverse("view_endpoint", args=(endpoint.id,))),
],
)
# Show the "All findings" breadcrumb if nothing is coming from the product or engagement
elif not self.get_engagement_id() and not self.get_product_id():
add_breadcrumb(title="Findings", top_level=not len(request.GET), request=request)
return request, context
def get(self, request: HttpRequest, product_id: int | None = None, engagement_id: int | None = None, test_id: int | None = None):
# Store the product, engagement, and test ids
self.product_id = product_id
self.engagement_id = engagement_id
self.test_id = test_id
# Get the initial context
request, context = self.get_initial_context(request)
# Get the filtered findings
filtered_findings = self.get_fully_filtered_findings(request)
# trick to prefetch after paging to avoid huge join generated by select count(*) from Paginator
paged_findings = get_page_items(request, filtered_findings.qs, 25)
# prefetch the related objects in the findings
paged_findings.object_list = prefetch_for_findings(
paged_findings.object_list,
self.get_prefetch_type())
# Add some breadcrumbs
request, context = self.add_breadcrumbs(request, context)
# Add the filtered and paged findings into the context
context |= {
"findings": paged_findings,
"filtered": filtered_findings,
}
# Render the view
return render(request, self.get_template(), context)
class ListOpenFindings(ListFindings):
def get(self, request: HttpRequest, product_id: int | None = None, engagement_id: int | None = None, test_id: int | None = None):
self.filter_name = "Open"
return super().get(request, product_id=product_id, engagement_id=engagement_id, test_id=test_id)
class ListVerifiedFindings(ListFindings):
def get(self, request: HttpRequest, product_id: int | None = None, engagement_id: int | None = None, test_id: int | None = None):
self.filter_name = "Verified"
return super().get(request, product_id=product_id, engagement_id=engagement_id, test_id=test_id)
class ListOutOfScopeFindings(ListFindings):
def get(self, request: HttpRequest, product_id: int | None = None, engagement_id: int | None = None, test_id: int | None = None):
self.filter_name = "Out of Scope"
return super().get(request, product_id=product_id, engagement_id=engagement_id, test_id=test_id)
class ListFalsePositiveFindings(ListFindings):
def get(self, request: HttpRequest, product_id: int | None = None, engagement_id: int | None = None, test_id: int | None = None):
self.filter_name = "False Positive"
return super().get(request, product_id=product_id, engagement_id=engagement_id, test_id=test_id)
class ListInactiveFindings(ListFindings):
def get(self, request: HttpRequest, product_id: int | None = None, engagement_id: int | None = None, test_id: int | None = None):
self.filter_name = "Inactive"
return super().get(request, product_id=product_id, engagement_id=engagement_id, test_id=test_id)
class ListAcceptedFindings(ListFindings):
def get(self, request: HttpRequest, product_id: int | None = None, engagement_id: int | None = None, test_id: int | None = None):
self.filter_name = "Accepted"
return super().get(request, product_id=product_id, engagement_id=engagement_id, test_id=test_id)
class ListClosedFindings(ListFindings):
def get(self, request: HttpRequest, product_id: int | None = None, engagement_id: int | None = None, test_id: int | None = None):
self.filter_name = "Closed"
self.order_by = "-mitigated"
return super().get(request, product_id=product_id, engagement_id=engagement_id, test_id=test_id)
class ViewFinding(View):
def get_finding(self, finding_id: int):
finding_qs = prefetch_for_findings(Finding.objects.filter(id=finding_id), exclude_untouched=False)
return get_object_or_404(finding_qs, id=finding_id)
def get_dojo_user(self, request: HttpRequest):
user = request.user
return get_object_or_404(Dojo_User, id=user.id)
def get_previous_and_next_findings(self, finding: Finding):
# Get the whole list of findings in the current test
findings = (
Finding.objects.filter(test=finding.test)
.order_by("numerical_severity")
.values_list("id", flat=True)
)
logger.debug(findings)
# Set some reasonable defaults
next_finding_id = finding.id
prev_finding_id = finding.id
last_pos = (len(findings)) - 1
# get the index of the current finding
current_finding_index = list(findings).index(finding.id)
# Try to get the previous ID
with contextlib.suppress(IndexError, ValueError):
prev_finding_id = findings[current_finding_index - 1]
# Try to get the next ID
with contextlib.suppress(IndexError, ValueError):
next_finding_id = findings[current_finding_index + 1]
return {
"prev_finding_id": prev_finding_id,
"next_finding_id": next_finding_id,
"findings_list": findings,
"findings_list_lastElement": findings[last_pos],
}
def get_request_response(self, finding: Finding):
request_response = None
burp_request = None
burp_response = None
try:
request_response = BurpRawRequestResponse.objects.filter(finding=finding).first()
if request_response is not None:
burp_request = base64.b64decode(request_response.burpRequestBase64)
burp_response = base64.b64decode(request_response.burpResponseBase64)
except Exception as e:
logger.debug("unsuspected error: %s", e)
return {
"burp_request": burp_request,
"burp_response": burp_response,
}
def get_test_import_data(self, request: HttpRequest, finding: Finding):
test_imports = Test_Import.objects.filter(findings_affected=finding)
test_import_filter = TestImportFilter(request.GET, test_imports)
test_import_finding_actions = finding.test_import_finding_action_set
test_import_finding_actions_count = test_import_finding_actions.all().count()
test_import_finding_actions = test_import_finding_actions.filter(test_import__in=test_import_filter.qs)
test_import_finding_action_filter = TestImportFindingActionFilter(request.GET, test_import_finding_actions)
paged_test_import_finding_actions = get_page_items_and_count(request, test_import_finding_action_filter.qs, 5, prefix="test_import_finding_actions")
paged_test_import_finding_actions.object_list = paged_test_import_finding_actions.object_list.prefetch_related("test_import")
latest_test_import_finding_action = finding.test_import_finding_action_set.order_by("-created").first
return {
"test_import_filter": test_import_filter,
"test_import_finding_action_filter": test_import_finding_action_filter,
"paged_test_import_finding_actions": paged_test_import_finding_actions,
"latest_test_import_finding_action": latest_test_import_finding_action,
"test_import_finding_actions_count": test_import_finding_actions_count,
}
def get_similar_findings(self, request: HttpRequest, finding: Finding):
similar_findings_enabled = get_system_setting("enable_similar_findings", True)
if similar_findings_enabled is False:
return {
"similar_findings_enabled": similar_findings_enabled,
"duplicate_cluster": duplicate_cluster(request, finding),
"similar_findings": None,
"similar_findings_filter": None,
}
# add related actions for non-similar and non-duplicate cluster members
finding.related_actions = calculate_possible_related_actions_for_similar_finding(
request, finding, finding,
)
if finding.duplicate_finding:
finding.duplicate_finding.related_actions = (
calculate_possible_related_actions_for_similar_finding(
request, finding, finding.duplicate_finding,
)
)
filter_string_matching = get_system_setting("filter_string_matching", False)
finding_filter_class = SimilarFindingFilterWithoutObjectLookups if filter_string_matching else SimilarFindingFilter
similar_findings_filter = finding_filter_class(
request.GET,
queryset=get_authorized_findings("view")
.filter(test__engagement__product=finding.test.engagement.product)
.exclude(id=finding.id),
user=request.user,
finding=finding,
)
logger.debug("similar query: %s", similar_findings_filter.qs.query)
similar_findings = get_page_items(
request,
similar_findings_filter.qs,
settings.SIMILAR_FINDINGS_MAX_RESULTS,
prefix="similar",
)
similar_findings.object_list = prefetch_for_similar_findings(
similar_findings.object_list,
)
for similar_finding in similar_findings:
similar_finding.related_actions = (
calculate_possible_related_actions_for_similar_finding(
request, finding, similar_finding,
)
)
return {
"similar_findings_enabled": similar_findings_enabled,
"duplicate_cluster": duplicate_cluster(request, finding),
"similar_findings": similar_findings,
"similar_findings_filter": similar_findings_filter,
}
def get_jira_data(self, finding: Finding):
(
can_be_pushed_to_jira,
can_be_pushed_to_jira_error,
error_code,
) = jira_services.can_be_pushed(finding)
# Check the error code
if error_code:
logger.debug(error_code)
return {
"can_be_pushed_to_jira": can_be_pushed_to_jira,
"can_be_pushed_to_jira_error": can_be_pushed_to_jira_error,
}
def get_note_form(self, request: HttpRequest):
# Set up the args for the form
args = [request.POST] if request.method == "POST" else []
# Set the initial form args
kwargs = {}
return NoteForm(*args, **kwargs)
def get_typed_note_form(self, request: HttpRequest, context: dict):
# Set up the args for the form
args = [request.POST] if request.method == "POST" else []
# Set the initial form args
kwargs = {
"available_note_types": context.get("available_note_types"),
}
return TypedNoteForm(*args, **kwargs)
def get_form(self, request: HttpRequest, context: dict):
return (
self.get_typed_note_form(request, context)
if context.get("note_type_activation")
else self.get_note_form(request)
)
def process_form(self, request: HttpRequest, finding: Finding, context: dict):
if context["form"].is_valid():
# Create the note object
new_note = context["form"].save(commit=False)
new_note.author = request.user
new_note.date = timezone.now()
new_note.save()
# Add an entry to the note history
history = NoteHistory(
data=new_note.entry, time=new_note.date, current_editor=new_note.author,
)
history.save()
new_note.history.add(history)
# Associate the note with the finding
finding.notes.add(new_note)
finding.last_reviewed = new_note.date
finding.last_reviewed_by = context["user"]
finding.save()
# Determine if the note should be sent to jira
if finding.has_jira_issue:
jira_services.add_comment(finding, new_note)
elif finding.has_jira_group_issue:
jira_services.add_comment(finding.finding_group, new_note)
# Send the notification of the note being added
url = request.build_absolute_uri(
reverse("view_finding", args=(finding.id,)),
)
title = f"Finding: {finding.title}"
process_tag_notifications(request, new_note, url, title)
# Add a message to the request
messages.add_message(
request, messages.SUCCESS, "Note saved.", extra_tags="alert-success",
)
return request, True
return request, False
def get_initial_context(self, request: HttpRequest, finding: Finding, user: Dojo_User):
notes = finding.notes.all()
note_type_activation = Note_Type.objects.filter(is_active=True).count()
available_note_types = None
if note_type_activation:
available_note_types = find_available_notetypes(notes)
# Set the current context
context = {
"finding": finding,
"dojo_user": user,
"user": request.user,
"notes": notes,
"files": finding.files.all(),
"note_type_activation": note_type_activation,
"available_note_types": available_note_types,
"enable_table_filtering": get_system_setting("enable_ui_table_based_searching"),
"product_tab": Product_Tab(
finding.test.engagement.product, title="View Finding", tab="findings",
),
}
# Set the form using the context, and then update the context
form = self.get_form(request, context)
context["form"] = form
return context
def get_template(self):
return "dojo/view_finding.html"
def get(self, request: HttpRequest, finding_id: int):
# Get the initial objects
finding = self.get_finding(finding_id)
user = self.get_dojo_user(request)
# Make sure the user is authorized
user_has_permission_or_403(user, finding, "view")
# Set up the initial context
context = self.get_initial_context(request, finding, user)
# Add in the other extras
context |= self.get_previous_and_next_findings(finding)
# Add in more of the other extras
context |= self.get_request_response(finding)
context |= self.get_similar_findings(request, finding)
context |= self.get_test_import_data(request, finding)
context |= self.get_jira_data(finding)
# Render the form
return render(request, self.get_template(), context)
def post(self, request: HttpRequest, finding_id):
# Get the initial objects
finding = self.get_finding(finding_id)
user = self.get_dojo_user(request)
# Make sure the user is authorized
user_has_permission_or_403(user, finding, "view")
# Quick perms check to determine if the user has access to add a note to the finding
user_has_permission_or_403(user, finding, "add")
# Set up the initial context
context = self.get_initial_context(request, finding, user)
# Determine the validity of the form
request, success = self.process_form(request, finding, context)
# Handle the case of a successful form
if success:
return HttpResponseRedirect(reverse("view_finding", args=(finding_id,)))
# Add in more of the other extras
context |= self.get_request_response(finding)
context |= self.get_similar_findings(request, finding)
context |= self.get_test_import_data(request, finding)
context |= self.get_jira_data(finding)
# Render the form
return render(request, self.get_template(), context)
class EditFinding(View):
def get_finding(self, finding_id: int):
return get_object_or_404(Finding, id=finding_id)
def get_request_response(self, finding: Finding):
req_resp = None
if burp_rr := BurpRawRequestResponse.objects.filter(finding=finding).first():
req_resp = (burp_rr.get_request(), burp_rr.get_response())
return req_resp
def get_finding_form(self, request: HttpRequest, finding: Finding):
# Get the burp request if available
req_resp = self.get_request_response(finding)
# Set up the args for the form
args = [request.POST] if request.method == "POST" else []
# Set the initial form args
kwargs = {
"instance": finding,
"req_resp": req_resp,
"can_edit_mitigated_data": finding_helper.can_edit_mitigated_data(request.user),
"initial": {"vulnerability_ids": "\n".join(finding.vulnerability_ids)},
}
return FindingForm(*args, **kwargs)
def get_jira_form(self, request: HttpRequest, finding: Finding, finding_form: FindingForm = None):
# Determine if jira should be used
if (jira_project := jira_services.get_project(finding)) is not None:
# Determine if push all findings is enabled
push_all_findings = jira_services.is_push_all_issues(finding)
# Set up the args for the form
args = [request.POST] if request.method == "POST" else []
# Set the initial form args
kwargs = {
"push_all": push_all_findings,
"prefix": "jiraform",
"instance": finding,
"jira_project": jira_project,
"finding_form": finding_form,
}
return JIRAFindingForm(*args, **kwargs)
return None
def get_github_form(self, request: HttpRequest, finding: Finding):
# Determine if github should be used
if get_system_setting("enable_github"):
# Ensure there is a github conf correctly configured for the product
config_present = GITHUB_PKey.objects.filter(product=finding.test.engagement.product)
if config_present := config_present.exclude(git_conf_id=None):
# Set up the args for the form
args = [request.POST] if request.method == "POST" else []
# Set the initial form args
kwargs = {
"enabled": finding.has_github_issue(),
"prefix": "githubform",
}
return GITHUBFindingForm(*args, **kwargs)
return None
def get_initial_context(self, request: HttpRequest, finding: Finding):
# Get the finding form first since it is used in another place
finding_form = self.get_finding_form(request, finding)
return {
"form": finding_form,
"finding": finding,
"jform": self.get_jira_form(request, finding, finding_form=finding_form),
"gform": self.get_github_form(request, finding),
"return_url": get_return_url(request),
"product_tab": Product_Tab(
finding.test.engagement.product, title="Edit Finding", tab="findings",
),
}
def validate_status_change(self, request: HttpRequest, finding: Finding, context: dict):
# If the finding is already not active, skip this extra validation
if not finding.active:
return request
# Validate the proper notes are added for mitigation
if (not context["form"]["active"].value() or context["form"]["false_p"].value() or context["form"]["out_of_scope"].value()) and not context["form"]["duplicate"].value():
note_type_activation = Note_Type.objects.filter(is_active=True).count()
closing_disabled = 0
if note_type_activation:
closing_disabled = len(get_missing_mandatory_notetypes(finding))
if closing_disabled != 0:
error_inactive = ValidationError(
"Can not set a finding as inactive without adding all mandatory notes",
code="inactive_without_mandatory_notes",
)
error_false_p = ValidationError(
"Can not set a finding as false positive without adding all mandatory notes",
code="false_p_without_mandatory_notes",
)
error_out_of_scope = ValidationError(
"Can not set a finding as out of scope without adding all mandatory notes",
code="out_of_scope_without_mandatory_notes",
)
if context["form"]["active"].value() is False:
context["form"].add_error("active", error_inactive)
if context["form"]["false_p"].value():
context["form"].add_error("false_p", error_false_p)
if context["form"]["out_of_scope"].value():
context["form"].add_error("out_of_scope", error_out_of_scope)
messages.add_message(
request,
messages.ERROR,
("Can not set a finding as inactive, "
"false positive or out of scope without adding all mandatory notes"),
extra_tags="alert-danger",
)
return request
def process_mitigated_data(self, request: HttpRequest, finding: Finding, context: dict):
# If active is not checked and CAN_EDIT_MITIGATED_DATA,
# mitigate the finding and the associated endpoints status
if finding_helper.can_edit_mitigated_data(request.user) and ((
context["form"]["active"].value() is False
or context["form"]["false_p"].value()
or context["form"]["out_of_scope"].value()
) and context["form"]["duplicate"].value() is False):
now = timezone.now()
finding.is_mitigated = True
if settings.V3_FEATURE_LOCATIONS:
for ref in finding.locations.all():
ref.set_status(
FindingLocationStatus.Mitigated,
context["form"].cleaned_data.get("mitigated_by") or request.user,
context["form"].cleaned_data.get("mitigated") or now,
)
else:
# TODO: Delete this after the move to Locations
endpoint_status = finding.status_finding.all()
for status in endpoint_status:
status.mitigated_by = (
context["form"].cleaned_data.get("mitigated_by") or request.user
)
status.mitigated_time = (
context["form"].cleaned_data.get("mitigated") or now
)
status.mitigated = True
status.last_modified = timezone.now()
status.save()
def process_false_positive_history(self, finding: Finding, *, old_false_p: bool = False):
if get_system_setting("false_positive_history", False):
# If the finding is being marked as a false positive we dont need to call the
# fp history function because it will be called by the save function.
# If finding was a false positive and is being reactivated: retroactively reactivates all equal findings.
# old_false_p must be captured before form.save(commit=False) mutates the finding in place.
if old_false_p and not finding.false_p and get_system_setting("retroactive_false_positive_history"):
logger.debug("FALSE_POSITIVE_HISTORY: Reactivating existing findings based on: %s", finding)
# QuerySet.update() bypasses Django signals, which is intentional here — it mirrors
# the previous save_no_options() calls that also disabled all post-save processing.
# match_finding_to_existing_findings returns a lazy QS with no .only() applied,
# so any field can be added here without needing a corresponding .only() change in deduplication.py#_fetch_fp_candidates_for_batch.
match_finding_to_existing_findings(
finding, product=finding.test.engagement.product,
).filter(false_p=True).update(
false_p=False,
active=finding.active,
verified=finding.verified,
out_of_scope=finding.out_of_scope,
is_mitigated=finding.is_mitigated,
)
def process_burp_request_response(self, finding: Finding, context: dict):
if "request" in context["form"].cleaned_data or "response" in context["form"].cleaned_data:
try:
burp_rr, _ = BurpRawRequestResponse.objects.get_or_create(finding=finding)
except BurpRawRequestResponse.MultipleObjectsReturned:
burp_rr = BurpRawRequestResponse.objects.filter(finding=finding).first()
burp_rr.burpRequestBase64 = base64.b64encode(
context["form"].cleaned_data["request"].encode(),
)
burp_rr.burpResponseBase64 = base64.b64encode(
context["form"].cleaned_data["response"].encode(),
)
burp_rr.clean()
burp_rr.save()
def process_finding_form(self, request: HttpRequest, finding: Finding, context: dict):
if context["form"].is_valid():
# process some of the easy stuff first
# Capture false_p before form.save(commit=False) mutates the finding in place,
# so process_false_positive_history can detect a false-positive → active transition.
old_false_p = finding.false_p
new_finding = context["form"].save(commit=False)
new_finding.test = finding.test
new_finding.numerical_severity = Finding.get_numerical_severity(new_finding.severity)
new_finding.last_reviewed = timezone.now()
new_finding.last_reviewed_by = request.user
new_finding.tags = context["form"].cleaned_data["tags"]
# Handle group related things
if "group" in context["form"].cleaned_data:
finding_group = context["form"].cleaned_data["group"]
finding_helper.update_finding_group(new_finding, finding_group)
# Handle risk exception related things
if "risk_accepted" in context["form"].cleaned_data and context["form"]["risk_accepted"].value():
if new_finding.test.engagement.product.enable_simple_risk_acceptance:
ra_helper.simple_risk_accept(request.user, new_finding, perform_save=False)
elif new_finding.risk_accepted:
ra_helper.risk_unaccept(request.user, new_finding, perform_save=False)
# Save and add new locations; replace=True so deselected endpoints are removed
associated_locations = finding_helper.add_locations(new_finding, context["form"], replace=True)
# Remove unrelated endpoints
if settings.V3_FEATURE_LOCATIONS:
for ref in new_finding.locations.all():
if ref.location not in associated_locations:
ref.location.disassociate_from_finding(new_finding)
else:
# TODO: Delete this after the move to Locations
endpoint_status_list = Endpoint_Status.objects.filter(finding=new_finding)
for endpoint_status in endpoint_status_list:
if endpoint_status.endpoint not in new_finding.endpoints.all():
endpoint_status.delete()
# Handle some of the other steps
self.process_mitigated_data(request, new_finding, context)
self.process_false_positive_history(new_finding, old_false_p=old_false_p)
self.process_burp_request_response(new_finding, context)
# Save the vulnerability IDs
finding_helper.save_vulnerability_ids(new_finding, context["form"].cleaned_data["vulnerability_ids"].split())
# Add a success message
messages.add_message(
request,
messages.SUCCESS,
"Finding saved successfully.",
extra_tags="alert-success",
)
return finding, request, True
add_error_message_to_response("The form has errors, please correct them below.")
add_field_errors_to_response(context["form"])
return finding, request, False
def process_jira_form(self, request: HttpRequest, finding: Finding, context: dict):
# Capture case if the jira not being enabled
if context["jform"] is None:
return request, True, False
if context["jform"] and context["jform"].is_valid():
jira_message = None
logger.debug("jform.jira_issue: %s", context["jform"].cleaned_data.get("jira_issue"))
logger.debug(JFORM_PUSH_TO_JIRA_MESSAGE, context["jform"].cleaned_data.get("push_to_jira"))
# can't use helper as when push_all_jira_issues is True, the checkbox gets disabled and is always false
push_to_jira_checkbox = context["jform"].cleaned_data.get("push_to_jira")
push_all_jira_issues = jira_services.is_push_all_issues(finding)
push_to_jira = push_all_jira_issues or push_to_jira_checkbox or jira_services.is_keep_in_sync(finding)
logger.debug("push_to_jira: %s", push_to_jira)
logger.debug("push_all_jira_issues: %s", push_all_jira_issues)
logger.debug("has_jira_group_issue: %s", finding.has_jira_group_issue)
# if the jira issue key was changed, update database
new_jira_issue_key = context["jform"].cleaned_data.get("jira_issue")
# we only support linking / changing if there is no group issue
if not finding.has_jira_group_issue:
if finding.has_jira_issue:
"""
everything in DD around JIRA integration is based on the internal id
of the issue in JIRA instead of on the public jira issue key.
I have no idea why, but it means we have to retrieve the issue from JIRA
to get the internal JIRA id. we can assume the issue exist,
which is already checked in the validation of the form
"""
if not new_jira_issue_key:
jira_services.unlink_finding(request, finding)
jira_message = "Link to JIRA issue removed successfully."
elif new_jira_issue_key != finding.jira_issue.jira_key:
jira_services.unlink_finding(request, finding)
jira_services.link_finding(request, finding, new_jira_issue_key)
jira_message = "Changed JIRA link successfully."
elif new_jira_issue_key:
jira_services.link_finding(request, finding, new_jira_issue_key)
jira_message = "Linked a JIRA issue successfully."
# any existing finding should be updated
# Determine if a message should be added
if jira_message:
messages.add_message(
request, messages.SUCCESS, jira_message, extra_tags="alert-success",
)
return request, True, push_to_jira
add_field_errors_to_response(context["jform"])
return request, False, False
def process_github_form(self, request: HttpRequest, finding: Finding, context: dict, old_status: str):
if "githubform-push_to_github" not in request.POST:
return request, True
if context["gform"].is_valid():
if GITHUB_Issue.objects.filter(finding=finding).exists():
update_external_issue(finding.id, old_status, "github")
else: