Skip to content

Commit ef6dd16

Browse files
committed
Add ZDI security advisory importer
Implements ZDI importer pipeline (issue 1471) using the Zero Day Initiative RSS feeds at zerodayinitiative.com/rss/published/YEAR/. Fetches year-specific feeds from 2007 through the current year, parses advisory ID from the link URL, extracts CVE aliases from the description, and records the publication date. Deduplicates advisories across feeds using a seen-IDs set. Includes 7 unit tests covering normal parsing, missing CVE, missing link, invalid XML, and alias deduplication. Signed-off-by: newklei <magmacicada@proton.me>
1 parent 2dbbd38 commit ef6dd16

File tree

5 files changed

+300
-0
lines changed

5 files changed

+300
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
from vulnerabilities.pipelines.v2_importers import gentoo_importer as gentoo_importer_v2
5959
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
6060
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
61+
from vulnerabilities.pipelines.v2_importers import zdi_importer as zdi_importer_v2
6162
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
6263
from vulnerabilities.pipelines.v2_importers import mattermost_importer as mattermost_importer_v2
6364
from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2
@@ -110,6 +111,7 @@
110111
ruby_importer_v2.RubyImporterPipeline,
111112
epss_importer_v2.EPSSImporterPipeline,
112113
gentoo_importer_v2.GentooImporterPipeline,
114+
zdi_importer_v2.ZDIImporterPipeline,
113115
nginx_importer_v2.NginxImporterPipeline,
114116
debian_importer_v2.DebianImporterPipeline,
115117
mattermost_importer_v2.MattermostImporterPipeline,
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
import re
12+
from datetime import datetime
13+
from datetime import timezone
14+
from typing import Iterable
15+
from xml.etree import ElementTree
16+
17+
from vulnerabilities.importer import AdvisoryDataV2
18+
from vulnerabilities.importer import ReferenceV2
19+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
20+
from vulnerabilities.utils import fetch_response
21+
22+
logger = logging.getLogger(__name__)
23+
24+
ZDI_RSS_YEAR_URL = "https://www.zerodayinitiative.com/rss/published/{year}/"
25+
ZDI_START_YEAR = 2007
26+
ZDI_ID_RE = re.compile(r"ZDI-\d+-\d+")
27+
CVE_RE = re.compile(r"CVE-\d{4}-\d{4,7}")
28+
PUBDATE_FORMAT = "%a, %d %b %Y %H:%M:%S %z"
29+
30+
31+
class ZDIImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
32+
"""Collect ZDI security advisories from the Zero Day Initiative RSS feeds."""
33+
34+
pipeline_id = "zdi_importer"
35+
spdx_license_expression = "LicenseRef-scancode-proprietary-license"
36+
license_url = "https://www.zerodayinitiative.com"
37+
repo_url = "https://www.zerodayinitiative.com"
38+
precedence = 200
39+
40+
@classmethod
41+
def steps(cls):
42+
return (cls.collect_and_store_advisories,)
43+
44+
def advisories_count(self) -> int:
45+
return 0
46+
47+
def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
48+
current_year = datetime.now(tz=timezone.utc).year
49+
urls = [
50+
ZDI_RSS_YEAR_URL.format(year=year)
51+
for year in range(ZDI_START_YEAR, current_year + 1)
52+
]
53+
54+
seen_ids = set()
55+
for url in urls:
56+
self.log(f"Fetching ZDI RSS feed: {url}")
57+
try:
58+
response = fetch_response(url)
59+
items = parse_rss_feed(response.text)
60+
except Exception as e:
61+
logger.error("Failed to fetch %s: %s", url, e)
62+
continue
63+
64+
for item in items:
65+
advisory = parse_advisory_data(item)
66+
if advisory and advisory.advisory_id not in seen_ids:
67+
seen_ids.add(advisory.advisory_id)
68+
yield advisory
69+
70+
71+
def parse_rss_feed(xml_text: str) -> list:
72+
"""
73+
Parse ZDI RSS feed XML text and return a list of raw item dicts.
74+
75+
Each dict has keys: ``title``, ``link``, ``description``, ``pub_date``.
76+
Returns an empty list if the XML is malformed or has no ``<channel>`` element.
77+
78+
>>> xml = (
79+
... '<?xml version="1.0"?><rss version="2.0"><channel>'
80+
... '<item><title>ZDI-25-001: Test</title>'
81+
... '<link>http://www.zerodayinitiative.com/advisories/ZDI-25-001/</link>'
82+
... '<description>CVE-2025-12345</description>'
83+
... '<pubDate>Mon, 06 Jan 2025 00:00:00 -0600</pubDate>'
84+
... '</item></channel></rss>'
85+
... )
86+
>>> items = parse_rss_feed(xml)
87+
>>> len(items)
88+
1
89+
>>> items[0]['title']
90+
'ZDI-25-001: Test'
91+
"""
92+
try:
93+
root = ElementTree.fromstring(xml_text)
94+
except ElementTree.ParseError as e:
95+
logger.error("Failed to parse RSS XML: %s", e)
96+
return []
97+
98+
channel = root.find("channel")
99+
if channel is None:
100+
logger.error("RSS feed has no <channel> element")
101+
return []
102+
103+
items = []
104+
for item_el in channel.findall("item"):
105+
items.append(
106+
{
107+
"title": (item_el.findtext("title") or "").strip(),
108+
"link": (item_el.findtext("link") or "").strip(),
109+
"description": (item_el.findtext("description") or "").strip(),
110+
"pub_date": (item_el.findtext("pubDate") or "").strip(),
111+
}
112+
)
113+
return items
114+
115+
116+
def parse_advisory_data(item: dict):
117+
"""
118+
Parse a single ZDI RSS item dict into an AdvisoryDataV2 object.
119+
120+
Returns ``None`` if a ZDI advisory ID cannot be extracted from the link URL.
121+
The RSS feed does not carry structured package data, so ``affected_packages``
122+
is always empty.
123+
124+
>>> item = {
125+
... "title": "ZDI-25-001: Example Remote Code Execution",
126+
... "link": "http://www.zerodayinitiative.com/advisories/ZDI-25-001/",
127+
... "description": "CVSS rating of 8.8. CVE-2025-12345.",
128+
... "pub_date": "Mon, 06 Jan 2025 00:00:00 -0600",
129+
... }
130+
>>> result = parse_advisory_data(item)
131+
>>> result.advisory_id
132+
'ZDI-25-001'
133+
>>> result.aliases
134+
['CVE-2025-12345']
135+
"""
136+
link = item.get("link") or ""
137+
title = item.get("title") or ""
138+
description = item.get("description") or ""
139+
pub_date_str = item.get("pub_date") or ""
140+
141+
match = ZDI_ID_RE.search(link)
142+
if not match:
143+
logger.error("Could not extract ZDI advisory ID from link: %r", link)
144+
return None
145+
146+
advisory_id = match.group(0)
147+
aliases = list(dict.fromkeys(CVE_RE.findall(description)))
148+
149+
date_published = None
150+
if pub_date_str:
151+
try:
152+
date_published = datetime.strptime(pub_date_str, PUBDATE_FORMAT)
153+
except ValueError:
154+
logger.warning(
155+
"Could not parse date %r for advisory %s", pub_date_str, advisory_id
156+
)
157+
158+
references = []
159+
if link:
160+
references.append(ReferenceV2(url=link))
161+
162+
return AdvisoryDataV2(
163+
advisory_id=advisory_id,
164+
aliases=aliases,
165+
summary=title,
166+
affected_packages=[],
167+
references=references,
168+
date_published=date_published,
169+
url=link,
170+
)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"advisory_id": "ZDI-25-001",
3+
"aliases": [
4+
"CVE-2025-12345"
5+
],
6+
"summary": "ZDI-25-001: Example Vendor Product Remote Code Execution Vulnerability",
7+
"affected_packages": [],
8+
"references": [
9+
{
10+
"reference_id": "",
11+
"reference_type": "",
12+
"url": "http://www.zerodayinitiative.com/advisories/ZDI-25-001/"
13+
}
14+
],
15+
"patches": [],
16+
"severities": [],
17+
"date_published": "2025-01-06T00:00:00-06:00",
18+
"weaknesses": [],
19+
"url": "http://www.zerodayinitiative.com/advisories/ZDI-25-001/"
20+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<rss version="2.0">
3+
<channel>
4+
<title>Zero Day Initiative - Published Advisories</title>
5+
<link>http://www.zerodayinitiative.com</link>
6+
<description>Published ZDI Advisories</description>
7+
<item>
8+
<title><![CDATA[ZDI-25-001: Example Vendor Product Remote Code Execution Vulnerability]]></title>
9+
<guid isPermaLink="false">ZDI-CAN-12345</guid>
10+
<link>http://www.zerodayinitiative.com/advisories/ZDI-25-001/</link>
11+
<description><![CDATA[This vulnerability allows remote attackers to execute arbitrary code on affected installations of Example Vendor Product. User interaction is required to exploit this vulnerability. The ZDI has assigned a CVSS rating of 8.8. The following CVEs are assigned: CVE-2025-12345.]]></description>
12+
<pubDate>Mon, 06 Jan 2025 00:00:00 -0600</pubDate>
13+
</item>
14+
<item>
15+
<title><![CDATA[ZDI-25-002: Another Vendor Product Information Disclosure Vulnerability]]></title>
16+
<guid isPermaLink="false">ZDI-CAN-67890</guid>
17+
<link>http://www.zerodayinitiative.com/advisories/ZDI-25-002/</link>
18+
<description><![CDATA[This vulnerability allows remote attackers to disclose sensitive information on affected installations of Another Vendor Product. No user interaction is required to exploit this vulnerability. The ZDI has assigned a CVSS rating of 5.3. No CVE has been assigned to this advisory at this time.]]></description>
19+
<pubDate>Tue, 07 Jan 2025 00:00:00 -0600</pubDate>
20+
</item>
21+
</channel>
22+
</rss>
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import os
11+
from unittest import TestCase
12+
13+
from vulnerabilities.pipelines.v2_importers.zdi_importer import parse_advisory_data
14+
from vulnerabilities.pipelines.v2_importers.zdi_importer import parse_rss_feed
15+
from vulnerabilities.tests import util_tests
16+
17+
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
18+
TEST_DATA = os.path.join(BASE_DIR, "test_data/zdi")
19+
20+
21+
def _load_rss(filename="zdi_rss_mock.xml"):
22+
with open(os.path.join(TEST_DATA, filename), encoding="utf-8") as f:
23+
return f.read()
24+
25+
26+
class TestZDIImporter(TestCase):
27+
def test_parse_rss_feed_returns_correct_item_count(self):
28+
"""parse_rss_feed returns one dict per <item> in the RSS feed."""
29+
items = parse_rss_feed(_load_rss())
30+
self.assertEqual(len(items), 2)
31+
32+
def test_parse_rss_feed_item_fields(self):
33+
"""Each parsed item dict contains the expected keys and values."""
34+
items = parse_rss_feed(_load_rss())
35+
first = items[0]
36+
self.assertEqual(first["title"], "ZDI-25-001: Example Vendor Product Remote Code Execution Vulnerability")
37+
self.assertEqual(first["link"], "http://www.zerodayinitiative.com/advisories/ZDI-25-001/")
38+
self.assertIn("CVE-2025-12345", first["description"])
39+
self.assertEqual(first["pub_date"], "Mon, 06 Jan 2025 00:00:00 -0600")
40+
41+
def test_parse_advisory_with_cve(self):
42+
"""Advisory with CVE alias and pubDate is parsed into a correct AdvisoryDataV2."""
43+
items = parse_rss_feed(_load_rss())
44+
result = parse_advisory_data(items[0])
45+
self.assertIsNotNone(result)
46+
result_dict = result.to_dict()
47+
expected_file = os.path.join(TEST_DATA, "expected_zdi_advisory_output1.json")
48+
util_tests.check_results_against_json(result_dict, expected_file)
49+
50+
def test_parse_advisory_no_cve_has_empty_aliases(self):
51+
"""Advisory whose description contains no CVE IDs has an empty aliases list."""
52+
items = parse_rss_feed(_load_rss())
53+
result = parse_advisory_data(items[1])
54+
self.assertIsNotNone(result)
55+
self.assertEqual(result.advisory_id, "ZDI-25-002")
56+
self.assertEqual(result.aliases, [])
57+
58+
def test_parse_advisory_missing_link_returns_none(self):
59+
"""Advisory with an empty link (no ZDI ID) must return None."""
60+
item = {
61+
"title": "ZDI-25-999: Test Advisory",
62+
"link": "",
63+
"description": "Some description. CVE-2025-99999.",
64+
"pub_date": "Mon, 06 Jan 2025 00:00:00 -0600",
65+
}
66+
result = parse_advisory_data(item)
67+
self.assertIsNone(result)
68+
69+
def test_parse_rss_feed_invalid_xml_returns_empty(self):
70+
"""Malformed XML input returns an empty list without raising."""
71+
result = parse_rss_feed("not valid xml <>>>")
72+
self.assertEqual(result, [])
73+
74+
def test_parse_advisory_zdi_id_not_in_aliases(self):
75+
"""The ZDI advisory ID must be advisory_id only, not duplicated in aliases."""
76+
item = {
77+
"title": "ZDI-25-100: Some Vulnerability",
78+
"link": "http://www.zerodayinitiative.com/advisories/ZDI-25-100/",
79+
"description": "CVSS 7.0. CVE-2025-11111.",
80+
"pub_date": "Wed, 08 Jan 2025 00:00:00 -0600",
81+
}
82+
result = parse_advisory_data(item)
83+
self.assertIsNotNone(result)
84+
self.assertEqual(result.advisory_id, "ZDI-25-100")
85+
self.assertNotIn("ZDI-25-100", result.aliases)
86+
self.assertIn("CVE-2025-11111", result.aliases)

0 commit comments

Comments
 (0)