diff --git a/scanpipe/pipes/vulnerablecode.py b/scanpipe/pipes/vulnerablecode.py index 6c6073b5d0..46ca8a769c 100644 --- a/scanpipe/pipes/vulnerablecode.py +++ b/scanpipe/pipes/vulnerablecode.py @@ -211,6 +211,46 @@ def filter_vulnerabilities(vulnerabilities, ignore_set): and not any(alias in ignore_set for alias in vulnerability.get("aliases", [])) ] +def normalize_advisories(advisories): + """ + Convert advisory-based API responses into vulnerability-like structure + for backward compatibility. + """ + normalized = [] + + for advisory in advisories or []: + entry = { + "vulnerability_id": advisory.get("advisory_id") + or advisory.get("id"), + "aliases": advisory.get("aliases", []), + "summary": advisory.get("summary") + or advisory.get("description"), + "references": advisory.get("references", []), + } + normalized.append(entry) + + return normalized + + +def extract_security_entries(package_data): + """ + Return a normalized list of security entries from package_data. + Supports both vulnerability-based and advisory-based API responses. + """ + if not package_data: + return [] + + if "affected_by_vulnerabilities" in package_data: + return package_data.get("affected_by_vulnerabilities", []) + + if "affected_by_advisories" in package_data: + logger.debug( + "VulnerableCode advisory-based response detected; normalizing entries." + ) + advisories = package_data.get("affected_by_advisories", []) + return normalize_advisories(advisories) + + return [] def fetch_vulnerabilities( packages, chunk_size=1000, logger=logger.info, ignore_set=None @@ -229,7 +269,7 @@ def fetch_vulnerabilities( unsaved_objects = [] for package in packages: if package_data := vulnerabilities_by_purl.get(package.package_url): - affected_by = package_data.get("affected_by_vulnerabilities", []) + affected_by = extract_security_entries(package_data) if ignore_set and affected_by: affected_by = filter_vulnerabilities(affected_by, ignore_set) diff --git a/scanpipe/tests/pipes/test_vulnerablecode.py b/scanpipe/tests/pipes/test_vulnerablecode.py index e39dbf69ed..564d59a371 100644 --- a/scanpipe/tests/pipes/test_vulnerablecode.py +++ b/scanpipe/tests/pipes/test_vulnerablecode.py @@ -64,6 +64,56 @@ def test_scanpipe_pipes_vulnerablecode_fetch_vulnerabilities( django_5_0.refresh_from_db() self.assertEqual(1, len(django_5_0.affected_by_vulnerabilities)) + @mock.patch("scanpipe.pipes.vulnerablecode.bulk_search_by_purl") + def test_fetch_vulnerabilities_with_advisory_response( + self, mock_search_by_purl + ): + django_5_0 = make_package(self.project1, "pkg:pypi/django@5.0") + + advisory_response = { + "purl": "pkg:pypi/django@5.0", + "affected_by_advisories": [ + { + "advisory_id": "ADV-123", + "aliases": ["CVE-2024-0001"], + "summary": "Test advisory summary", + } + ], + } + + mock_search_by_purl.return_value = [advisory_response] + + fetch_vulnerabilities(packages=[django_5_0]) + django_5_0.refresh_from_db() + + self.assertEqual(1, len(django_5_0.affected_by_vulnerabilities)) + self.assertEqual( + "ADV-123", + django_5_0.affected_by_vulnerabilities[0]["vulnerability_id"], + ) + self.assertEqual( + ["CVE-2024-0001"], + django_5_0.affected_by_vulnerabilities[0]["aliases"], + ) + + @mock.patch("scanpipe.pipes.vulnerablecode.bulk_search_by_purl") + def test_fetch_vulnerabilities_with_unexpected_response( + self, mock_search_by_purl + ): + django_5_0 = make_package(self.project1, "pkg:pypi/django@5.0") + + unexpected_response = { + "purl": "pkg:pypi/django@5.0", + # No vulnerabilities or advisories field + } + + mock_search_by_purl.return_value = [unexpected_response] + + fetch_vulnerabilities(packages=[django_5_0]) + django_5_0.refresh_from_db() + + self.assertEqual([], django_5_0.affected_by_vulnerabilities) + def test_scanpipe_pipes_vulnerablecode_filter_vulnerabilities(self): data = self.data / "vulnerablecode/django-5.0_package_data.json" package_data = json.loads(data.read_text())