From 84de7a70cc0efca4b4e2c45e8bf91d42a8a79e28 Mon Sep 17 00:00:00 2001 From: samiat4911 Date: Tue, 10 Mar 2026 12:05:04 +0100 Subject: [PATCH] fix(awssecurityhub): use parse_cvss_data helper for CVSS extraction --- dojo/tools/awssecurityhub/inspector.py | 22 +++++++++++++++++++ unittests/tools/test_awssecurityhub_parser.py | 8 +++++++ 2 files changed, 30 insertions(+) diff --git a/dojo/tools/awssecurityhub/inspector.py b/dojo/tools/awssecurityhub/inspector.py index 8e78ec71f9a..022aca4238a 100644 --- a/dojo/tools/awssecurityhub/inspector.py +++ b/dojo/tools/awssecurityhub/inspector.py @@ -4,6 +4,7 @@ from dojo.models import Endpoint, Finding from dojo.tools.locations import LocationData +from dojo.utils import parse_cvss_data SEVERITY_MAP = { "INFORMATIONAL": "Info", @@ -31,6 +32,7 @@ def get_item(self, finding: dict, test): references = [] unsaved_vulnerability_ids = [] epss_score = finding.get("EpssScore") + cvss_data = {} description = f"This is an Inspector Finding\n{finding.get('Description', '')}" + "\n" description += f"**AWS Finding ARN:** {finding_id}\n" description += f"**AwsAccountId:** {finding.get('AwsAccountId', '')}\n" @@ -52,6 +54,10 @@ def get_item(self, finding: dict, test): references.append(vendor_url) if vulnerability.get("EpssScore") is not None: epss_score = vulnerability.get("EpssScore") + # Extract and validate CVSS vectors using the common parse_cvss_data helper + for cvss_entry in vulnerability.get("Cvss", []): + if not cvss_data and cvss_entry.get("BaseVector"): + cvss_data = parse_cvss_data(cvss_entry.get("BaseVector")) if finding.get("ProductFields", {}).get("aws/inspector/FindingStatus", "ACTIVE") == "ACTIVE": mitigated = None is_Mitigated = False @@ -120,6 +126,22 @@ def get_item(self, finding: dict, test): result.unsaved_endpoints = locations if epss_score is not None: result.epss_score = epss_score + if cvss_data: + if cvss_data.get("cvssv3"): + result.cvssv3 = cvss_data["cvssv3"] + if cvss_data.get("cvssv4"): + result.cvssv4 = cvss_data["cvssv4"] + # Build severity justification from available CVSS data + severity_parts = [] + if cvss_data.get("cvssv3"): + severity_parts.append(f"CVSS v3 vector: {cvss_data['cvssv3']}") + if cvss_data.get("cvssv4"): + severity_parts.append(f"CVSS v4 vector: {cvss_data['cvssv4']}") + severity_label = finding.get("Severity", {}).get("Label", "") + if severity_label: + severity_parts.append(f"AWS severity: {severity_label}") + if severity_parts: + result.severity_justification = "\n".join(severity_parts) # Add the unsaved vulnerability ids result.unsaved_vulnerability_ids = unsaved_vulnerability_ids return result diff --git a/unittests/tools/test_awssecurityhub_parser.py b/unittests/tools/test_awssecurityhub_parser.py index cbca268841d..c91e9bf3e7a 100644 --- a/unittests/tools/test_awssecurityhub_parser.py +++ b/unittests/tools/test_awssecurityhub_parser.py @@ -72,6 +72,10 @@ def test_inspector_ec2(self): self.assertEqual(1, len(finding.unsaved_vulnerability_ids)) self.assertEqual("CVE-2022-3643", finding.unsaved_vulnerability_ids[0]) self.assertEqual("- Update kernel-4.14.301\n\t- yum update kernel\n", finding.mitigation) + # Verify CVSS v3 extraction via parse_cvss_data helper + self.assertEqual("CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H", finding.cvssv3) + self.assertIn("CVSS v3 vector:", finding.severity_justification) + self.assertIn("AWS severity: CRITICAL", finding.severity_justification) location = self.get_unsaved_locations(finding)[0] self.assertEqual("AwsEc2Instance_arn_aws_ec2_us-east-1_XXXXXXXXXXXX_i-11111111111111111".lower(), location.host.lower()) @@ -97,6 +101,8 @@ def test_inspector_ec2_ghsa(self): self.assertIn("GHSA-p98r-538v-jgw5", finding.title) self.assertSetEqual({"CVE-2023-34256", "GHSA-p98r-538v-jgw5"}, set(finding.unsaved_vulnerability_ids)) self.assertEqual("https://github.com/bottlerocket-os/bottlerocket/security/advisories/GHSA-p98r-538v-jgw5", finding.references) + # Verify backward compatibility: no CVSS data in this fixture + self.assertIsNone(finding.cvssv3) location = self.get_unsaved_locations(finding)[0] self.assertEqual("AwsEc2Instance_arn_aws_ec2_eu-central-1_012345678912_instance_i-07c11cc535d830123".lower(), location.host.lower()) @@ -115,6 +121,8 @@ def test_inspector_ecr(self): self.assertIn("repo-os/sha256:af965ef68c78374a5f987fce98c0ddfa45801df2395bf012c50b863e65978d74", finding.impact) self.assertIn("Repository: repo-os", finding.impact) self.assertEqual(0.0014, finding.epss_score) + # Verify CVSS v3 extraction from the ECR fixture + self.assertEqual("CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:U/C:N/I:N/A:H", finding.cvssv3) location = self.get_unsaved_locations(finding)[0] self.assertEqual("AwsEcrContainerImage_arn_aws_ecr_eu-central-1_123456789012_repository_repo-os_sha256_af965ef68c78374a5f987fce98c0ddfa45801df2395bf012c50b863e65978d74".lower(), location.host.lower())