|
1 | 1 | import json |
2 | 2 | import logging |
3 | | -import re |
4 | 3 | from pathlib import Path |
5 | 4 | from typing import Iterable |
6 | 5 |
|
7 | | -import dateparser |
8 | 6 | from fetchcode.vcs import fetch_via_vcs |
9 | 7 |
|
10 | 8 | from vulnerabilities.importer import AdvisoryDataV2 |
11 | | -from vulnerabilities.importer import ReferenceV2 |
12 | | -from vulnerabilities.importer import VulnerabilitySeverity |
13 | | -from vulnerabilities.models import VulnerabilityReference |
14 | 9 | from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 |
15 | | -from vulnerabilities.severity_systems import SCORING_SYSTEMS |
| 10 | +from vulnerabilities.pipelines.v2_importers.cve_schema import parse_cve_advisory |
16 | 11 | from vulnerabilities.utils import get_advisory_url |
17 | | -from vulnerabilities.utils import get_cwe_id |
18 | | -from vulnerabilities.utils import get_reference_id |
19 | | -from vulnerabilities.utils import ssvc_calculator |
20 | 12 |
|
21 | 13 | logger = logging.getLogger(__name__) |
22 | 14 |
|
@@ -63,148 +55,7 @@ def collect_advisories(self) -> Iterable[AdvisoryDataV2]: |
63 | 55 | base_path=base_path, |
64 | 56 | url="https://github.com/cisagov/vulnrichment/blob/develop/", |
65 | 57 | ) |
66 | | - yield self.parse_cve_advisory(raw_data, advisory_url) |
67 | | - |
68 | | - def parse_cve_advisory(self, raw_data, advisory_url): |
69 | | - cve_metadata = raw_data.get("cveMetadata", {}) |
70 | | - cve_id = cve_metadata.get("cveId") |
71 | | - state = cve_metadata.get("state") |
72 | | - |
73 | | - date_published = cve_metadata.get("datePublished") |
74 | | - if date_published: |
75 | | - date_published = dateparser.parse( |
76 | | - date_published, |
77 | | - settings={ |
78 | | - "TIMEZONE": "UTC", |
79 | | - "RETURN_AS_TIMEZONE_AWARE": True, |
80 | | - "TO_TIMEZONE": "UTC", |
81 | | - }, |
82 | | - ) |
83 | | - |
84 | | - # Extract containers |
85 | | - containers = raw_data.get("containers", {}) |
86 | | - cna_data = containers.get("cna", {}) |
87 | | - adp_data = containers.get("adp", {}) |
88 | | - |
89 | | - # Extract descriptions |
90 | | - summary = "" |
91 | | - description_list = cna_data.get("descriptions", []) |
92 | | - for description_dict in description_list: |
93 | | - if not description_dict.get("lang") in ["en", "en-US"]: |
94 | | - continue |
95 | | - summary = description_dict.get("value") |
96 | | - |
97 | | - # Extract metrics |
98 | | - severities = [] |
99 | | - metrics = cna_data.get("metrics", []) + [ |
100 | | - adp_metrics for data in adp_data for adp_metrics in data.get("metrics", []) |
101 | | - ] |
102 | | - |
103 | | - vulnrichment_scoring_system = { |
104 | | - "cvssV4_0": SCORING_SYSTEMS["cvssv4"], |
105 | | - "cvssV3_1": SCORING_SYSTEMS["cvssv3.1"], |
106 | | - "cvssV3_0": SCORING_SYSTEMS["cvssv3"], |
107 | | - "cvssV2_0": SCORING_SYSTEMS["cvssv2"], |
108 | | - "other": { |
109 | | - "ssvc": SCORING_SYSTEMS["ssvc"], |
110 | | - }, # ignore kev |
111 | | - } |
112 | | - |
113 | | - for metric in metrics: |
114 | | - for metric_type, metric_value in metric.items(): |
115 | | - if metric_type not in vulnrichment_scoring_system: |
116 | | - continue |
117 | | - |
118 | | - if metric_type == "other": |
119 | | - other_types = metric_value.get("type") |
120 | | - if other_types == "ssvc": |
121 | | - content = metric_value.get("content", {}) |
122 | | - vector_string, decision = ssvc_calculator(content) |
123 | | - scoring_system = vulnrichment_scoring_system[metric_type][other_types] |
124 | | - severity = VulnerabilitySeverity( |
125 | | - system=scoring_system, value=decision, scoring_elements=vector_string |
126 | | - ) |
127 | | - severities.append(severity) |
128 | | - # ignore kev |
129 | | - else: |
130 | | - vector_string = metric_value.get("vectorString") |
131 | | - base_score = metric_value.get("baseScore") |
132 | | - scoring_system = vulnrichment_scoring_system[metric_type] |
133 | | - severity = VulnerabilitySeverity( |
134 | | - system=scoring_system, value=base_score, scoring_elements=vector_string |
135 | | - ) |
136 | | - severities.append(severity) |
137 | | - |
138 | | - # Extract references cpes and ignore affected products |
139 | | - cpes = set() |
140 | | - for affected_product in cna_data.get("affected", []): |
141 | | - if type(affected_product) != dict: |
142 | | - continue |
143 | | - cpes.update(affected_product.get("cpes") or []) |
144 | | - |
145 | | - references = [] |
146 | | - for ref in cna_data.get("references", []): |
147 | | - # https://github.com/CVEProject/cve-schema/blob/main/schema/tags/reference-tags.json |
148 | | - # We removed all unwanted reference types and set the default reference type to 'OTHER'. |
149 | | - ref_type = VulnerabilityReference.OTHER |
150 | | - vul_ref_types = { |
151 | | - "exploit": VulnerabilityReference.EXPLOIT, |
152 | | - "issue-tracking": VulnerabilityReference.BUG, |
153 | | - "mailing-list": VulnerabilityReference.MAILING_LIST, |
154 | | - "third-party-advisory": VulnerabilityReference.ADVISORY, |
155 | | - "vendor-advisory": VulnerabilityReference.ADVISORY, |
156 | | - "vdb-entry": VulnerabilityReference.ADVISORY, |
157 | | - } |
158 | | - |
159 | | - for tag_type in ref.get("tags", []): |
160 | | - if tag_type in vul_ref_types: |
161 | | - ref_type = vul_ref_types.get(tag_type) |
162 | | - |
163 | | - url = ref.get("url") |
164 | | - reference = ReferenceV2( |
165 | | - reference_id=get_reference_id(url), |
166 | | - url=url, |
167 | | - reference_type=ref_type, |
168 | | - ) |
169 | | - |
170 | | - references.append(reference) |
171 | | - |
172 | | - cpes_ref = [ |
173 | | - ReferenceV2( |
174 | | - reference_id=cpe, |
175 | | - reference_type=VulnerabilityReference.OTHER, |
176 | | - url=f"https://nvd.nist.gov/vuln/search/results?adv_search=true&isCpeNameSearch=true&query={cpe}", |
177 | | - ) |
178 | | - for cpe in sorted(list(cpes)) |
179 | | - ] |
180 | | - references.extend(cpes_ref) |
181 | | - |
182 | | - weaknesses = set() |
183 | | - for problem_type in cna_data.get("problemTypes", []): |
184 | | - descriptions = problem_type.get("descriptions", []) |
185 | | - for description in descriptions: |
186 | | - cwe_id = description.get("cweId") |
187 | | - if cwe_id: |
188 | | - weaknesses.add(get_cwe_id(cwe_id)) |
189 | | - |
190 | | - description_text = description.get("description") |
191 | | - if description_text: |
192 | | - pattern = r"CWE-(\d+)" |
193 | | - match = re.search(pattern, description_text) |
194 | | - if match: |
195 | | - weaknesses.add(int(match.group(1))) |
196 | | - |
197 | | - return AdvisoryDataV2( |
198 | | - advisory_id=cve_id, |
199 | | - aliases=[], |
200 | | - summary=summary, |
201 | | - references=references, |
202 | | - date_published=date_published, |
203 | | - weaknesses=sorted(weaknesses), |
204 | | - url=advisory_url, |
205 | | - severities=severities, |
206 | | - original_advisory_text=json.dumps(raw_data, indent=2, ensure_ascii=False), |
207 | | - ) |
| 58 | + yield parse_cve_advisory(raw_data, advisory_url) |
208 | 59 |
|
209 | 60 | def clean_downloads(self): |
210 | 61 | if self.vcs_response: |
|
0 commit comments