forked from DefectDojo/django-DefectDojo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinspector.py
More file actions
147 lines (140 loc) · 6.46 KB
/
inspector.py
File metadata and controls
147 lines (140 loc) · 6.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import datetime
from django.conf import settings
from dojo.models import Endpoint, Finding
from dojo.tools.locations import LocationData
from dojo.utils import parse_cvss_data
SEVERITY_MAP = {
"INFORMATIONAL": "Info",
"LOW": "Low",
"MEDIUM": "Medium",
"HIGH": "High",
"CRITICAL": "Critical",
}
def get_severity(finding):
return SEVERITY_MAP.get(
finding.get("Severity", {}).get("Label", "INFORMATIONAL"),
"Info",
)
class Inspector:
def get_item(self, finding: dict, test):
finding_id = finding.get("Id", "")
title = finding.get("Title", "")
severity = get_severity(finding)
mitigation = ""
impact = []
references = []
unsaved_vulnerability_ids = []
epss_score = finding.get("EpssScore")
cvss_data = {}
description = f"This is an Inspector Finding\n{finding.get('Description', '')}" + "\n"
description += f"**AWS Finding ARN:** {finding_id}\n"
description += f"**AwsAccountId:** {finding.get('AwsAccountId', '')}\n"
description += f"**Region:** {finding.get('Region', '')}\n"
vulnerabilities = finding.get("Vulnerabilities", [])
for vulnerability in vulnerabilities:
# Save the CVE if it is present
if cve := vulnerability.get("Id"):
unsaved_vulnerability_ids.append(cve)
unsaved_vulnerability_ids.extend(alias for alias in vulnerability.get("RelatedVulnerabilities", []) if alias != cve)
# Add information about the vulnerable packages to the description and mitigation
vulnerable_packages = vulnerability.get("VulnerablePackages", [])
for package in vulnerable_packages:
mitigation += f"- Update {package.get('Name', '')}-{package.get('Version', '')}\n"
if remediation := package.get("Remediation"):
mitigation += f"\t- {remediation}\n"
if vendor := vulnerability.get("Vendor"):
if vendor_url := vendor.get("Url"):
references.append(vendor_url)
if vulnerability.get("EpssScore") is not None:
epss_score = vulnerability.get("EpssScore")
# Extract and validate CVSS vectors using the common parse_cvss_data helper
for cvss_entry in vulnerability.get("Cvss", []):
if not cvss_data and cvss_entry.get("BaseVector"):
cvss_data = parse_cvss_data(cvss_entry.get("BaseVector"))
if finding.get("ProductFields", {}).get("aws/inspector/FindingStatus", "ACTIVE") == "ACTIVE":
mitigated = None
is_Mitigated = False
active = True
else:
is_Mitigated = True
active = False
if finding.get("LastObservedAt"):
try:
mitigated = datetime.datetime.strptime(finding.get("LastObservedAt"), "%Y-%m-%dT%H:%M:%S.%fZ")
except Exception:
mitigated = datetime.datetime.strptime(finding.get("LastObservedAt"), "%Y-%m-%dT%H:%M:%fZ")
else:
mitigated = datetime.datetime.now(datetime.UTC)
title_suffix = ""
locations = []
for resource in finding.get("Resources", []):
component_name = resource.get("Type")
host_value = f"{component_name}_{resource.get('Id')}".replace(":", "_").replace("/", "_")
if settings.V3_FEATURE_LOCATIONS:
locations.append(LocationData.url(host=host_value))
else:
# TODO: Delete this after the move to Locations
locations.append(Endpoint(host=host_value))
if component_name == "AwsEcrContainerImage":
details = resource.get("Details", {}).get("AwsEcrContainerImage")
arn = resource.get("Id")
if details:
impact.extend((
f"Image ARN: {arn}",
f"Registry: {details.get('RegistryId')}",
f"Repository: {details.get('RepositoryName')}",
f"Image digest: {details.get('ImageDigest')}",
f"Image tags: {','.join(details.get('ImageTags', []))}",
))
title_suffix = f" - Image: {arn.split('/', 1)[1]}" # repo-name/sha256:digest
else: # generic implementation
resource_id = resource["Id"].split(":")[-1]
impact.append(f"Resource: {resource_id}")
title_suffix = f" - Resource: {resource_id}"
if remediation_rec_url := finding.get("Remediation", {}).get("Recommendation", {}).get("Url"):
references.append(remediation_rec_url)
false_p = False
result = Finding(
title=f"{title}{title_suffix}",
test=test,
description=description,
mitigation=mitigation,
references="\n".join(references),
severity=severity,
impact="\n".join(impact),
active=active,
verified=False,
false_p=false_p,
unique_id_from_tool=finding_id,
mitigated=mitigated,
is_mitigated=is_Mitigated,
static_finding=True,
dynamic_finding=False,
component_name=component_name,
)
if settings.V3_FEATURE_LOCATIONS:
result.unsaved_locations = locations
else:
# TODO: Delete this after the move to Locations
result.unsaved_endpoints = locations
if epss_score is not None:
result.epss_score = epss_score
if cvss_data:
if cvss_data.get("cvssv3"):
result.cvssv3 = cvss_data["cvssv3"]
if cvss_data.get("cvssv4"):
result.cvssv4 = cvss_data["cvssv4"]
# Build severity justification from available CVSS data
severity_parts = []
if cvss_data.get("cvssv3"):
severity_parts.append(f"CVSS v3 vector: {cvss_data['cvssv3']}")
if cvss_data.get("cvssv4"):
severity_parts.append(f"CVSS v4 vector: {cvss_data['cvssv4']}")
severity_label = finding.get("Severity", {}).get("Label", "")
if severity_label:
severity_parts.append(f"AWS severity: {severity_label}")
if severity_parts:
result.severity_justification = "\n".join(severity_parts)
# Add the unsaved vulnerability ids
result.unsaved_vulnerability_ids = unsaved_vulnerability_ids
return result