Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from vulnerabilities.pipelines.v2_importers import gentoo_importer as gentoo_importer_v2
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
from vulnerabilities.pipelines.v2_importers import zdi_importer as zdi_importer_v2
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
from vulnerabilities.pipelines.v2_importers import mattermost_importer as mattermost_importer_v2
from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2
Expand Down Expand Up @@ -110,6 +111,7 @@
ruby_importer_v2.RubyImporterPipeline,
epss_importer_v2.EPSSImporterPipeline,
gentoo_importer_v2.GentooImporterPipeline,
zdi_importer_v2.ZDIImporterPipeline,
nginx_importer_v2.NginxImporterPipeline,
debian_importer_v2.DebianImporterPipeline,
mattermost_importer_v2.MattermostImporterPipeline,
Expand Down
139 changes: 139 additions & 0 deletions vulnerabilities/pipelines/v2_importers/zdi_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
import re
from datetime import datetime
from datetime import timezone
from typing import Iterable
from xml.etree import ElementTree

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.utils import fetch_response

logger = logging.getLogger(__name__)

ZDI_RSS_YEAR_URL = "https://www.zerodayinitiative.com/rss/published/{year}/"
ZDI_START_YEAR = 2007
ZDI_ID_RE = re.compile(r"ZDI-\d+-\d+")
CVE_RE = re.compile(r"CVE-\d{4}-\d{4,7}")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have a function for this
see utils.py file.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ziadhany
Fixed all points:
Removed repo_url
Removed seen_ids: agreed, the pipeline framework handles deduplication at the DB level
Replaced strptime + PUBDATE_FORMAT with dateparser.parse()
Switched to find_all_cve from utils.py
Added CVSS score parsing via CVSS_RE = re.compile(r"CVSS rating of (\d+\.?\d*)") - the RSS description contains e.g. "The ZDI has assigned a CVSS rating of 8.8." so this extracts the score and stores it as a VulnerabilitySeverity with GENERIC system

Also fixed two CI failures that showed up:
Black - zdi_importer.py and test_zdi_importer.py were not formatted to --line-length 100
isort - __init__.py had the zdi_importer import inserted out of alphabetical order (between gitlab and istio instead of after xen)

PUBDATE_FORMAT = "%a, %d %b %Y %H:%M:%S %z"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PUBDATE_FORMAT = "%a, %d %b %Y %H:%M:%S %z"



class ZDIImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Collect ZDI security advisories from the Zero Day Initiative RSS feeds."""

pipeline_id = "zdi_importer"
spdx_license_expression = "LicenseRef-scancode-proprietary-license"
license_url = "https://www.zerodayinitiative.com"
repo_url = "https://www.zerodayinitiative.com"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we need this.

Suggested change
repo_url = "https://www.zerodayinitiative.com"

precedence = 200

@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)

def advisories_count(self) -> int:
return 0

def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
current_year = datetime.now(tz=timezone.utc).year
urls = [
ZDI_RSS_YEAR_URL.format(year=year) for year in range(ZDI_START_YEAR, current_year + 1)
]

seen_ids = set()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we have a seed ids ?

for url in urls:
self.log(f"Fetching ZDI RSS feed: {url}")
try:
response = fetch_response(url)
items = parse_rss_feed(response.text)
except Exception as e:
logger.error("Failed to fetch %s: %s", url, e)
continue

for item in items:
advisory = parse_advisory_data(item)
if advisory and advisory.advisory_id not in seen_ids:
seen_ids.add(advisory.advisory_id)
yield advisory


def parse_rss_feed(xml_text: str) -> list:
"""
Parse ZDI RSS feed XML text and return a list of raw item dicts.
Each dict has keys: ``title``, ``link``, ``description``, ``pub_date``.
Returns an empty list if the XML is malformed or has no ``<channel>`` element.
"""
try:
root = ElementTree.fromstring(xml_text)
except ElementTree.ParseError as e:
logger.error("Failed to parse RSS XML: %s", e)
return []

channel = root.find("channel")
if channel is None:
logger.error("RSS feed has no <channel> element")
return []

items = []
for item_el in channel.findall("item"):
items.append(
{
"title": (item_el.findtext("title") or "").strip(),
"link": (item_el.findtext("link") or "").strip(),
"description": (item_el.findtext("description") or "").strip(),
"pub_date": (item_el.findtext("pubDate") or "").strip(),
}
)
return items


def parse_advisory_data(item: dict):
"""
Parse a single ZDI RSS item dict into an AdvisoryDataV2 object.
Returns ``None`` if a ZDI advisory ID cannot be extracted from the link URL.
The RSS feed does not carry structured package data, so ``affected_packages``
is always empty.
"""
link = item.get("link") or ""
title = item.get("title") or ""
description = item.get("description") or ""
pub_date_str = item.get("pub_date") or ""

match = ZDI_ID_RE.search(link)
if not match:
logger.error("Could not extract ZDI advisory ID from link: %r", link)
return None

advisory_id = match.group(0)
aliases = list(dict.fromkeys(CVE_RE.findall(description)))

date_published = None
if pub_date_str:
try:
date_published = datetime.strptime(pub_date_str, PUBDATE_FORMAT)
except ValueError:
logger.warning("Could not parse date %r for advisory %s", pub_date_str, advisory_id)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a better way to do this. You could use the dateparser library. See the other importers for how we handle this parsing

>> import dateparser
>> dateparser.parse("Tue, 07 Jan 2025 00:00:00 -0600")
>> datetime.datetime(2025, 1, 7, 0, 0, tzinfo=<StaticTzInfo 'UTC-06:00'>)


references = []
if link:
references.append(ReferenceV2(url=link))

return AdvisoryDataV2(
advisory_id=advisory_id,
aliases=aliases,
summary=title,
affected_packages=[],
references=references,
date_published=date_published,
url=link,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"advisory_id": "ZDI-25-001",
"aliases": [
"CVE-2025-12345"
],
"summary": "ZDI-25-001: Example Vendor Product Remote Code Execution Vulnerability",
"affected_packages": [],
"references": [
{
"reference_id": "",
"reference_type": "",
"url": "http://www.zerodayinitiative.com/advisories/ZDI-25-001/"
}
],
"patches": [],
"severities": [],
"date_published": "2025-01-06T00:00:00-06:00",
"weaknesses": [],
"url": "http://www.zerodayinitiative.com/advisories/ZDI-25-001/"
}
22 changes: 22 additions & 0 deletions vulnerabilities/tests/test_data/zdi/zdi_rss_mock.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0">
<channel>
<title>Zero Day Initiative - Published Advisories</title>
<link>http://www.zerodayinitiative.com</link>
<description>Published ZDI Advisories</description>
<item>
<title><![CDATA[ZDI-25-001: Example Vendor Product Remote Code Execution Vulnerability]]></title>
<guid isPermaLink="false">ZDI-CAN-12345</guid>
<link>http://www.zerodayinitiative.com/advisories/ZDI-25-001/</link>
<description><![CDATA[This vulnerability allows remote attackers to execute arbitrary code on affected installations of Example Vendor Product. User interaction is required to exploit this vulnerability. The ZDI has assigned a CVSS rating of 8.8. The following CVEs are assigned: CVE-2025-12345.]]></description>
<pubDate>Mon, 06 Jan 2025 00:00:00 -0600</pubDate>
</item>
<item>
<title><![CDATA[ZDI-25-002: Another Vendor Product Information Disclosure Vulnerability]]></title>
<guid isPermaLink="false">ZDI-CAN-67890</guid>
<link>http://www.zerodayinitiative.com/advisories/ZDI-25-002/</link>
<description><![CDATA[This vulnerability allows remote attackers to disclose sensitive information on affected installations of Another Vendor Product. No user interaction is required to exploit this vulnerability. The ZDI has assigned a CVSS rating of 5.3. No CVE has been assigned to this advisory at this time.]]></description>
<pubDate>Tue, 07 Jan 2025 00:00:00 -0600</pubDate>
</item>
</channel>
</rss>
88 changes: 88 additions & 0 deletions vulnerabilities/tests/test_zdi_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import os
from unittest import TestCase

from vulnerabilities.pipelines.v2_importers.zdi_importer import parse_advisory_data
from vulnerabilities.pipelines.v2_importers.zdi_importer import parse_rss_feed
from vulnerabilities.tests import util_tests

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data/zdi")


def _load_rss(filename="zdi_rss_mock.xml"):
with open(os.path.join(TEST_DATA, filename), encoding="utf-8") as f:
return f.read()


class TestZDIImporter(TestCase):
def test_parse_rss_feed_returns_correct_item_count(self):
"""parse_rss_feed returns one dict per <item> in the RSS feed."""
items = parse_rss_feed(_load_rss())
self.assertEqual(len(items), 2)

def test_parse_rss_feed_item_fields(self):
"""Each parsed item dict contains the expected keys and values."""
items = parse_rss_feed(_load_rss())
first = items[0]
self.assertEqual(
first["title"], "ZDI-25-001: Example Vendor Product Remote Code Execution Vulnerability"
)
self.assertEqual(first["link"], "http://www.zerodayinitiative.com/advisories/ZDI-25-001/")
self.assertIn("CVE-2025-12345", first["description"])
self.assertEqual(first["pub_date"], "Mon, 06 Jan 2025 00:00:00 -0600")

def test_parse_advisory_with_cve(self):
"""Advisory with CVE alias and pubDate is parsed into a correct AdvisoryDataV2."""
items = parse_rss_feed(_load_rss())
result = parse_advisory_data(items[0])
self.assertIsNotNone(result)
result_dict = result.to_dict()
expected_file = os.path.join(TEST_DATA, "expected_zdi_advisory_output1.json")
util_tests.check_results_against_json(result_dict, expected_file)

def test_parse_advisory_no_cve_has_empty_aliases(self):
"""Advisory whose description contains no CVE IDs has an empty aliases list."""
items = parse_rss_feed(_load_rss())
result = parse_advisory_data(items[1])
self.assertIsNotNone(result)
self.assertEqual(result.advisory_id, "ZDI-25-002")
self.assertEqual(result.aliases, [])

def test_parse_advisory_missing_link_returns_none(self):
"""Advisory with an empty link (no ZDI ID) must return None."""
item = {
"title": "ZDI-25-999: Test Advisory",
"link": "",
"description": "Some description. CVE-2025-99999.",
"pub_date": "Mon, 06 Jan 2025 00:00:00 -0600",
}
result = parse_advisory_data(item)
self.assertIsNone(result)

def test_parse_rss_feed_invalid_xml_returns_empty(self):
"""Malformed XML input returns an empty list without raising."""
result = parse_rss_feed("not valid xml <>>>")
self.assertEqual(result, [])

def test_parse_advisory_zdi_id_not_in_aliases(self):
"""The ZDI advisory ID must be advisory_id only, not duplicated in aliases."""
item = {
"title": "ZDI-25-100: Some Vulnerability",
"link": "http://www.zerodayinitiative.com/advisories/ZDI-25-100/",
"description": "CVSS 7.0. CVE-2025-11111.",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are missing CVSS parsing. See:
https://www.zerodayinitiative.com/advisories/published/2025/

Also, have a look at the other importers to see how we can store the CVSS score.

"pub_date": "Wed, 08 Jan 2025 00:00:00 -0600",
}
result = parse_advisory_data(item)
self.assertIsNotNone(result)
self.assertEqual(result.advisory_id, "ZDI-25-100")
self.assertNotIn("ZDI-25-100", result.aliases)
self.assertIn("CVE-2025-11111", result.aliases)
Loading