Skip to content

Commit 6b850b3

Browse files
committed
Add support for CVEListV5
Use the Vulnrichment parser schema Add a CVE schema parser to handle both CVEListV5 and Vulnrichment Signed-off-by: ziad hany <ziadhany2016@gmail.com>
1 parent dcb0511 commit 6b850b3

21 files changed

Lines changed: 1073 additions & 275 deletions

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from vulnerabilities.pipelines.v2_importers import apache_httpd_importer as apache_httpd_v2
4545
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
4646
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
47+
from vulnerabilities.pipelines.v2_importers import cvelistv5_importer as cvelistv5_importer_v2
4748
from vulnerabilities.pipelines.v2_importers import (
4849
elixir_security_importer as elixir_security_importer_v2,
4950
)
@@ -69,6 +70,7 @@
6970
elixir_security_importer_v2.ElixirSecurityImporterPipeline,
7071
npm_importer_v2.NpmImporterPipeline,
7172
vulnrichment_importer_v2.VulnrichImporterPipeline,
73+
cvelistv5_importer_v2.CVEListV5ImporterPipeline,
7274
apache_httpd_v2.ApacheHTTPDImporterPipeline,
7375
pypa_importer_v2.PyPaImporterPipeline,
7476
gitlab_importer_v2.GitLabImporterPipeline,
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
import json
10+
import re
11+
12+
import dateparser
13+
14+
from vulnerabilities.importer import AdvisoryData
15+
from vulnerabilities.importer import ReferenceV2
16+
from vulnerabilities.importer import VulnerabilitySeverity
17+
from vulnerabilities.models import VulnerabilityReference
18+
from vulnerabilities.severity_systems import SCORING_SYSTEMS
19+
from vulnerabilities.utils import get_cwe_id
20+
from vulnerabilities.utils import get_reference_id
21+
from vulnerabilities.utils import ssvc_calculator
22+
23+
24+
def parse_cve_v5_advisory(raw_data, advisory_url):
25+
cve_metadata = raw_data.get("cveMetadata", {})
26+
cve_id = cve_metadata.get("cveId")
27+
28+
date_published = cve_metadata.get("datePublished")
29+
if date_published:
30+
date_published = dateparser.parse(
31+
date_published,
32+
settings={
33+
"TIMEZONE": "UTC",
34+
"RETURN_AS_TIMEZONE_AWARE": True,
35+
"TO_TIMEZONE": "UTC",
36+
},
37+
)
38+
39+
# Extract containers
40+
containers = raw_data.get("containers", {})
41+
cna_data = containers.get("cna", {})
42+
adp_data = containers.get("adp", {})
43+
44+
# Extract descriptions
45+
summary = ""
46+
description_list = cna_data.get("descriptions", [])
47+
for description_dict in description_list:
48+
if not description_dict.get("lang") in ["en", "en-US"]:
49+
continue
50+
summary = description_dict.get("value")
51+
52+
# Extract metrics
53+
severities = []
54+
metrics = cna_data.get("metrics", []) + [
55+
adp_metrics for data in adp_data for adp_metrics in data.get("metrics", [])
56+
]
57+
58+
cve_scoring_system = {
59+
"cvssV4_0": SCORING_SYSTEMS["cvssv4"],
60+
"cvssV3_1": SCORING_SYSTEMS["cvssv3.1"],
61+
"cvssV3_0": SCORING_SYSTEMS["cvssv3"],
62+
"cvssV2_0": SCORING_SYSTEMS["cvssv2"],
63+
"other": {
64+
"ssvc": SCORING_SYSTEMS["ssvc"],
65+
}, # ignore kev
66+
}
67+
68+
for metric in metrics:
69+
for metric_type, metric_value in metric.items():
70+
if metric_type not in cve_scoring_system:
71+
continue
72+
73+
if metric_type == "other":
74+
other_types = metric_value.get("type")
75+
if other_types == "ssvc":
76+
content = metric_value.get("content", {})
77+
vector_string, decision = ssvc_calculator(content)
78+
scoring_system = cve_scoring_system[metric_type][other_types]
79+
severity = VulnerabilitySeverity(
80+
system=scoring_system, value=decision, scoring_elements=vector_string
81+
)
82+
severities.append(severity)
83+
# ignore kev
84+
else:
85+
vector_string = metric_value.get("vectorString")
86+
base_score = metric_value.get("baseScore")
87+
scoring_system = cve_scoring_system[metric_type]
88+
severity = VulnerabilitySeverity(
89+
system=scoring_system, value=base_score, scoring_elements=vector_string
90+
)
91+
severities.append(severity)
92+
93+
# Extract references cpes and ignore affected products
94+
cpes = set()
95+
for affected_product in cna_data.get("affected", []):
96+
if type(affected_product) != dict:
97+
continue
98+
cpes.update(affected_product.get("cpes") or [])
99+
100+
references = []
101+
for ref in cna_data.get("references", []):
102+
# https://github.com/CVEProject/cve-schema/blob/main/schema/tags/reference-tags.json
103+
# We removed all unwanted reference types and set the default reference type to 'OTHER'.
104+
ref_type = VulnerabilityReference.OTHER
105+
vul_ref_types = {
106+
"exploit": VulnerabilityReference.EXPLOIT,
107+
"issue-tracking": VulnerabilityReference.BUG,
108+
"mailing-list": VulnerabilityReference.MAILING_LIST,
109+
"third-party-advisory": VulnerabilityReference.ADVISORY,
110+
"vendor-advisory": VulnerabilityReference.ADVISORY,
111+
"vdb-entry": VulnerabilityReference.ADVISORY,
112+
}
113+
114+
for tag_type in ref.get("tags", []):
115+
if tag_type in vul_ref_types:
116+
ref_type = vul_ref_types.get(tag_type)
117+
118+
url = ref.get("url")
119+
reference = ReferenceV2(
120+
reference_id=get_reference_id(url),
121+
url=url,
122+
reference_type=ref_type,
123+
)
124+
125+
references.append(reference)
126+
127+
cpes_ref = [
128+
ReferenceV2(
129+
reference_id=cpe,
130+
reference_type=VulnerabilityReference.OTHER,
131+
url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query={cpe}",
132+
)
133+
for cpe in sorted(list(cpes))
134+
]
135+
references.extend(cpes_ref)
136+
137+
weaknesses = set()
138+
for problem_type in cna_data.get("problemTypes", []):
139+
descriptions = problem_type.get("descriptions", [])
140+
for description in descriptions:
141+
cwe_id = description.get("cweId")
142+
if cwe_id:
143+
weaknesses.add(get_cwe_id(cwe_id))
144+
145+
description_text = description.get("description")
146+
if description_text:
147+
pattern = r"CWE-(\d+)"
148+
match = re.search(pattern, description_text)
149+
if match:
150+
weaknesses.add(int(match.group(1)))
151+
152+
return AdvisoryData(
153+
advisory_id=cve_id,
154+
aliases=[],
155+
summary=summary,
156+
references_v2=references,
157+
date_published=date_published,
158+
weaknesses=sorted(weaknesses),
159+
url=advisory_url,
160+
severities=severities,
161+
original_advisory_text=json.dumps(raw_data, indent=2, ensure_ascii=False),
162+
)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
import json
10+
import logging
11+
from pathlib import Path
12+
from typing import Iterable
13+
14+
from fetchcode.vcs import fetch_via_vcs
15+
16+
from vulnerabilities.importer import AdvisoryData
17+
from vulnerabilities.importers.cve_schema import parse_cve_v5_advisory
18+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
19+
from vulnerabilities.utils import get_advisory_url
20+
21+
logger = logging.getLogger(__name__)
22+
23+
24+
class CVEListV5ImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
25+
pipeline_id = "cvelistv5_importer_v2"
26+
# license PR: https://github.com/CVEProject/cvelistV5/pull/65
27+
spdx_license_expression = "CC0-1.0"
28+
license_url = "https://github.com/CVEProject/cvelistV5/blob/main/LICENSE"
29+
repo_url = "git+https://github.com/CVEProject/cvelistV5"
30+
31+
@classmethod
32+
def steps(cls):
33+
return (
34+
cls.clone,
35+
cls.collect_and_store_advisories,
36+
cls.clean_downloads,
37+
)
38+
39+
def clone(self):
40+
self.log(f"Cloning `{self.repo_url}`")
41+
self.vcs_response = fetch_via_vcs(self.repo_url)
42+
43+
def advisories_count(self):
44+
vuln_directory = Path(self.vcs_response.dest_dir) / "cves"
45+
return sum(1 for _ in vuln_directory.glob("*.json"))
46+
47+
def collect_advisories(self) -> Iterable[AdvisoryData]:
48+
base_directory = Path(self.vcs_response.dest_dir)
49+
vulns_directory = base_directory / "cves"
50+
51+
for file in vulns_directory.rglob("*.json"):
52+
if not file.name.startswith("CVE-"):
53+
continue
54+
55+
advisory_url = get_advisory_url(
56+
file=file,
57+
base_path=base_directory,
58+
url="https://github.com/CVEProject/cvelistV5/blob/main/",
59+
)
60+
61+
with open(file) as f:
62+
raw_data = json.load(f)
63+
yield parse_cve_v5_advisory(raw_data, advisory_url)
64+
65+
def clean_downloads(self):
66+
if self.vcs_response:
67+
self.log(f"Removing cloned repository")
68+
self.vcs_response.delete()
69+
70+
def on_failure(self):
71+
self.clean_downloads()

0 commit comments

Comments
 (0)