Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion scanpipe/pipes/vulnerablecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,46 @@ def filter_vulnerabilities(vulnerabilities, ignore_set):
and not any(alias in ignore_set for alias in vulnerability.get("aliases", []))
]

def normalize_advisories(advisories):
"""
Convert advisory-based API responses into vulnerability-like structure
for backward compatibility.
"""
normalized = []

for advisory in advisories or []:
entry = {
"vulnerability_id": advisory.get("advisory_id")
or advisory.get("id"),
"aliases": advisory.get("aliases", []),
"summary": advisory.get("summary")
or advisory.get("description"),
"references": advisory.get("references", []),
}
normalized.append(entry)

return normalized


def extract_security_entries(package_data):
"""
Return a normalized list of security entries from package_data.
Supports both vulnerability-based and advisory-based API responses.
"""
if not package_data:
return []

if "affected_by_vulnerabilities" in package_data:
return package_data.get("affected_by_vulnerabilities", [])

if "affected_by_advisories" in package_data:
logger.debug(
"VulnerableCode advisory-based response detected; normalizing entries."
)
advisories = package_data.get("affected_by_advisories", [])
return normalize_advisories(advisories)

return []

def fetch_vulnerabilities(
packages, chunk_size=1000, logger=logger.info, ignore_set=None
Expand All @@ -229,7 +269,7 @@ def fetch_vulnerabilities(
unsaved_objects = []
for package in packages:
if package_data := vulnerabilities_by_purl.get(package.package_url):
affected_by = package_data.get("affected_by_vulnerabilities", [])
affected_by = extract_security_entries(package_data)

if ignore_set and affected_by:
affected_by = filter_vulnerabilities(affected_by, ignore_set)
Expand Down
50 changes: 50 additions & 0 deletions scanpipe/tests/pipes/test_vulnerablecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,56 @@ def test_scanpipe_pipes_vulnerablecode_fetch_vulnerabilities(
django_5_0.refresh_from_db()
self.assertEqual(1, len(django_5_0.affected_by_vulnerabilities))

@mock.patch("scanpipe.pipes.vulnerablecode.bulk_search_by_purl")
def test_fetch_vulnerabilities_with_advisory_response(
self, mock_search_by_purl
):
django_5_0 = make_package(self.project1, "pkg:pypi/django@5.0")

advisory_response = {
"purl": "pkg:pypi/django@5.0",
"affected_by_advisories": [
{
"advisory_id": "ADV-123",
"aliases": ["CVE-2024-0001"],
"summary": "Test advisory summary",
}
],
}

mock_search_by_purl.return_value = [advisory_response]

fetch_vulnerabilities(packages=[django_5_0])
django_5_0.refresh_from_db()

self.assertEqual(1, len(django_5_0.affected_by_vulnerabilities))
self.assertEqual(
"ADV-123",
django_5_0.affected_by_vulnerabilities[0]["vulnerability_id"],
)
self.assertEqual(
["CVE-2024-0001"],
django_5_0.affected_by_vulnerabilities[0]["aliases"],
)

@mock.patch("scanpipe.pipes.vulnerablecode.bulk_search_by_purl")
def test_fetch_vulnerabilities_with_unexpected_response(
self, mock_search_by_purl
):
django_5_0 = make_package(self.project1, "pkg:pypi/django@5.0")

unexpected_response = {
"purl": "pkg:pypi/django@5.0",
# No vulnerabilities or advisories field
}

mock_search_by_purl.return_value = [unexpected_response]

fetch_vulnerabilities(packages=[django_5_0])
django_5_0.refresh_from_db()

self.assertEqual([], django_5_0.affected_by_vulnerabilities)

def test_scanpipe_pipes_vulnerablecode_filter_vulnerabilities(self):
data = self.data / "vulnerablecode/django-5.0_package_data.json"
package_data = json.loads(data.read_text())
Expand Down
Loading