1414from typing import Iterable
1515from xml .etree import ElementTree
1616
17+ import dateparser
18+
1719from vulnerabilities .importer import AdvisoryDataV2
1820from vulnerabilities .importer import ReferenceV2
21+ from vulnerabilities .importer import VulnerabilitySeverity
1922from vulnerabilities .pipelines import VulnerableCodeBaseImporterPipelineV2
23+ from vulnerabilities .severity_systems import GENERIC
2024from vulnerabilities .utils import fetch_response
25+ from vulnerabilities .utils import find_all_cve
2126
2227logger = logging .getLogger (__name__ )
2328
2429ZDI_RSS_YEAR_URL = "https://www.zerodayinitiative.com/rss/published/{year}/"
2530ZDI_START_YEAR = 2007
2631ZDI_ID_RE = re .compile (r"ZDI-\d+-\d+" )
27- CVE_RE = re .compile (r"CVE-\d{4}-\d{4,7}" )
28- PUBDATE_FORMAT = "%a, %d %b %Y %H:%M:%S %z"
32+ CVSS_RE = re .compile (r"CVSS rating of (\d+\.?\d*)" )
2933
3034
3135class ZDIImporterPipeline (VulnerableCodeBaseImporterPipelineV2 ):
@@ -34,7 +38,6 @@ class ZDIImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
3438 pipeline_id = "zdi_importer"
3539 spdx_license_expression = "LicenseRef-scancode-proprietary-license"
3640 license_url = "https://www.zerodayinitiative.com"
37- repo_url = "https://www.zerodayinitiative.com"
3841 precedence = 200
3942
4043 @classmethod
@@ -50,7 +53,6 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
5053 ZDI_RSS_YEAR_URL .format (year = year ) for year in range (ZDI_START_YEAR , current_year + 1 )
5154 ]
5255
53- seen_ids = set ()
5456 for url in urls :
5557 self .log (f"Fetching ZDI RSS feed: { url } " )
5658 try :
@@ -62,8 +64,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
6264
6365 for item in items :
6466 advisory = parse_advisory_data (item )
65- if advisory and advisory .advisory_id not in seen_ids :
66- seen_ids .add (advisory .advisory_id )
67+ if advisory :
6768 yield advisory
6869
6970
@@ -115,15 +116,19 @@ def parse_advisory_data(item: dict):
115116 return None
116117
117118 advisory_id = match .group (0 )
118- aliases = list (dict .fromkeys (CVE_RE . findall (description )))
119+ aliases = list (dict .fromkeys (find_all_cve (description )))
119120
120121 date_published = None
121122 if pub_date_str :
122- try :
123- date_published = datetime .strptime (pub_date_str , PUBDATE_FORMAT )
124- except ValueError :
123+ date_published = dateparser .parse (pub_date_str )
124+ if date_published is None :
125125 logger .warning ("Could not parse date %r for advisory %s" , pub_date_str , advisory_id )
126126
127+ severities = []
128+ cvss_match = CVSS_RE .search (description )
129+ if cvss_match :
130+ severities .append (VulnerabilitySeverity (system = GENERIC , value = cvss_match .group (1 )))
131+
127132 references = []
128133 if link :
129134 references .append (ReferenceV2 (url = link ))
@@ -135,5 +140,6 @@ def parse_advisory_data(item: dict):
135140 affected_packages = [],
136141 references = references ,
137142 date_published = date_published ,
143+ severities = severities ,
138144 url = link ,
139145 )
0 commit comments