Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions dojo/tools/aws_inspector2/parser.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import json
from datetime import UTC, datetime

Expand Down Expand Up @@ -114,6 +115,7 @@ def get_cvss_details(self, finding: Finding, raw_finding: dict) -> Finding:

def get_package_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding:
vulnerability_details = raw_finding.get("packageVulnerabilityDetails", {})
vulnerable_packages = vulnerability_details.get("vulnerablePackages", [])
vulnerability_packages_descriptions = "\n".join(
[
(
Expand All @@ -123,14 +125,40 @@ def get_package_vulnerability(self, finding: Finding, raw_finding: dict) -> Find
f"\tfixed version: {vulnerability_package.get('fixedInVersion', 'N/A')}\n"
f"\tremediation: {vulnerability_package.get('remediation', 'N/A')}\n"
)
for vulnerability_package in vulnerability_details.get("vulnerablePackages", [])
for vulnerability_package in vulnerable_packages
],
)
if (vulnerability_id := vulnerability_details.get("vulnerabilityId", None)) is not None:
finding.unsaved_vulnerability_ids = [vulnerability_id]
vulnerability_source = vulnerability_details.get("source")
vulnerability_source_url = vulnerability_details.get("sourceUrl")
# populate fields
# component name/version/file_path from the first vulnerable package
if vulnerable_packages:
finding.component_name = vulnerable_packages[0].get("name")
finding.component_version = vulnerable_packages[0].get("version")
finding.file_path = vulnerable_packages[0].get("filePath")
if settings.V3_FEATURE_LOCATIONS and finding.component_name:
finding.unsaved_locations.append(
LocationData.dependency(
name=finding.component_name,
version=finding.component_version or "",
file_path=finding.file_path or "",
),
)
# reference URLs from the advisory
reference_urls = vulnerability_details.get("referenceUrls", [])
if reference_urls:
finding.references = "\n".join(reference_urls)
# publish date from when the vendor first created the advisory
if vendor_created_at := vulnerability_details.get("vendorCreatedAt"):
with contextlib.suppress(ValueError):
finding.publish_date = date_parser.parse(vendor_created_at).date()
# CVSS v3 base score from the vendor-supplied CVSS entries
for cvss_entry in vulnerability_details.get("cvss", []):
if str(cvss_entry.get("version", "")).startswith("3") and cvss_entry.get("baseScore") is not None:
finding.cvssv3_score = float(cvss_entry["baseScore"])
break
# populate description fields
if vulnerability_source is not None and vulnerability_source_url is not None:
finding.url = vulnerability_source_url
finding.description += (
Expand All @@ -149,8 +177,8 @@ def get_code_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding
file_path_info = raw_finding.get("filePath", {})
file_name = file_path_info.get("fileName", "N/A")
file_path = file_path_info.get("filePath", "N/A")
start_line = file_path_info.get("startLine", "N/A")
end_line = file_path_info.get("endLine", "N/A")
start_line = file_path_info.get("startLine", None)
end_line = file_path_info.get("endLine", None)
Comment thread
Jino-T marked this conversation as resolved.
detector_tags = ", ".join(raw_finding.get("detectorTags", []))
reference_urls = ", ".join(raw_finding.get("referenceUrls", []))
rule_id = raw_finding.get("ruleId", "N/A")
Expand All @@ -162,6 +190,10 @@ def get_code_vulnerability(self, finding: Finding, raw_finding: dict) -> Finding
finding.sast_source_file_path = f"{file_path}{file_name}"
finding.line = start_line
finding.sast_source_line = start_line
if start_line is None:
start_line = "N/A"
if end_line is None:
end_line = "N/A"
finding.description += (
"\n**Additional info**\n"
f"CWEs: {string_cwes}\n"
Expand Down Expand Up @@ -270,9 +302,9 @@ def process_endpoints(self, finding: Finding, raw_finding: dict) -> Finding:
endpoints.append(Endpoint(host=endpoint_host))
finding.impact = "\n".join(impact)
if settings.V3_FEATURE_LOCATIONS:
finding.unsaved_locations = endpoints
finding.unsaved_locations.extend(endpoints)
else:
# TODO: Delete this after the move to Locations
finding.unsaved_endpoints = endpoints
finding.unsaved_endpoints.extend(endpoints)

return finding
Loading
Loading