Skip to content

Commit bb4736d

Browse files
committed
Switch libreoffice importer to HTML parsing
Parse advisory listing and individual advisory pages directly from libreoffice.org instead of calling cveawg.mitre.org. Drop unused JSON fixtures and update tests accordingly. Signed-off-by: Anmol Vats <anmolvats2003@gmail.com>
1 parent 072379e commit bb4736d

File tree

4 files changed

+154
-333
lines changed

4 files changed

+154
-333
lines changed

vulnerabilities/pipelines/v2_importers/libreoffice_importer.py

Lines changed: 57 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -7,36 +7,25 @@
77
# See https://aboutcode.org for more information about nexB OSS projects.
88
#
99

10-
import json
1110
import logging
11+
import re
1212
from typing import Iterable
1313

1414
import dateparser
1515
import requests
16+
from bs4 import BeautifulSoup
1617

1718
from vulnerabilities.importer import AdvisoryDataV2
1819
from vulnerabilities.importer import ReferenceV2
19-
from vulnerabilities.importer import VulnerabilitySeverity
2020
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
21-
from vulnerabilities.severity_systems import SCORING_SYSTEMS
22-
from vulnerabilities.utils import find_all_cve
23-
from vulnerabilities.utils import get_cwe_id
2421

2522
logger = logging.getLogger(__name__)
2623

2724
ADVISORIES_URL = "https://www.libreoffice.org/about-us/security/advisories/"
28-
CVE_API_URL = "https://cveawg.mitre.org/api/cve/{cve_id}"
29-
30-
CVSS_KEY_MAP = {
31-
"cvssV4_0": SCORING_SYSTEMS["cvssv4"],
32-
"cvssV3_1": SCORING_SYSTEMS["cvssv3.1"],
33-
"cvssV3_0": SCORING_SYSTEMS["cvssv3"],
34-
"cvssV2_0": SCORING_SYSTEMS["cvssv2"],
35-
}
3625

3726

3827
class LibreOfficeImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
39-
"""Collect LibreOffice security advisories via the CVE API."""
28+
"""Collect LibreOffice security advisories from libreoffice.org."""
4029

4130
pipeline_id = "libreoffice_importer"
4231
spdx_license_expression = "LicenseRef-scancode-proprietary-license"
@@ -54,101 +43,90 @@ def fetch(self):
5443
self.log(f"Fetch `{ADVISORIES_URL}`")
5544
resp = requests.get(ADVISORIES_URL, timeout=30)
5645
resp.raise_for_status()
57-
self.cve_ids = parse_cve_ids(resp.text)
46+
self.advisory_urls = parse_advisory_urls(resp.text)
5847

5948
def advisories_count(self):
60-
return len(self.cve_ids)
49+
return len(self.advisory_urls)
6150

6251
def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
63-
for cve_id in self.cve_ids:
64-
url = CVE_API_URL.format(cve_id=cve_id)
52+
for url in self.advisory_urls:
6553
try:
6654
resp = requests.get(url, timeout=30)
6755
resp.raise_for_status()
6856
except Exception as e:
69-
logger.error("Failed to fetch CVE API for %s: %s", cve_id, e)
57+
logger.error("Failed to fetch %s: %s", url, e)
7058
continue
71-
advisory = parse_cve_advisory(resp.json(), cve_id)
59+
advisory = parse_advisory(resp.text, url)
7260
if advisory:
7361
yield advisory
7462

7563

76-
def parse_cve_ids(html: str) -> list:
77-
"""Return deduplicated CVE IDs from the LibreOffice advisories listing page."""
78-
return list(dict.fromkeys(cve.upper() for cve in find_all_cve(html)))
64+
def parse_advisory_urls(html: str) -> list:
65+
"""Return deduplicated advisory page URLs from the listing page."""
66+
slugs = re.findall(r"/about-us/security/advisories/(cve-[\d-]+)/", html)
67+
seen = dict.fromkeys(slugs)
68+
return [f"https://www.libreoffice.org/about-us/security/advisories/{slug}/" for slug in seen]
69+
7970

71+
def parse_advisory(html: str, url: str):
72+
"""Parse a LibreOffice individual advisory page; return None if advisory id is missing."""
73+
soup = BeautifulSoup(html, features="lxml")
74+
body = soup.find("body")
75+
body_id = body.get("id", "") if body else ""
76+
if not body_id.startswith("cve-"):
77+
return None
78+
advisory_id = body_id.upper()
8079

81-
def parse_cve_advisory(data: dict, cve_id: str):
82-
"""Parse a CVE 5.0 JSON record from cveawg.mitre.org; return None if CVE ID is absent."""
83-
cve_metadata = data.get("cveMetadata") or {}
84-
advisory_id = cve_metadata.get("cveId") or cve_id
85-
if not advisory_id:
80+
content = soup.select_one("section#content1 div.margin-20")
81+
if not content:
8682
return None
8783

84+
text = content.get_text(separator="\n")
85+
86+
title = _get_field(text, "Title")
87+
date_str = _get_field(text, "Announced")
88+
8889
date_published = None
89-
raw_date = cve_metadata.get("datePublished") or ""
90-
if raw_date:
90+
if date_str:
9191
date_published = dateparser.parse(
92-
raw_date,
92+
date_str,
9393
settings={"TIMEZONE": "UTC", "RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"},
9494
)
9595
if date_published is None:
96-
logger.warning("Could not parse date %r for %s", raw_date, advisory_id)
97-
98-
cna = (data.get("containers") or {}).get("cna") or {}
96+
logger.warning("Could not parse date %r for %s", date_str, advisory_id)
9997

100-
summary = ""
101-
for desc in cna.get("descriptions") or []:
102-
if desc.get("lang") in ("en", "en-US"):
103-
summary = desc.get("value") or ""
104-
break
105-
106-
severities = []
107-
for metric in cna.get("metrics") or []:
108-
for key, system in CVSS_KEY_MAP.items():
109-
cvss = metric.get(key)
110-
if not cvss:
111-
continue
112-
vector = cvss.get("vectorString") or ""
113-
score = cvss.get("baseScore")
114-
if vector and score is not None:
115-
severities.append(
116-
VulnerabilitySeverity(
117-
system=system,
118-
value=str(score),
119-
scoring_elements=vector,
120-
)
121-
)
122-
break
123-
124-
weaknesses = []
125-
for problem_type in cna.get("problemTypes") or []:
126-
for desc in problem_type.get("descriptions") or []:
127-
cwe_str = desc.get("cweId") or ""
128-
if cwe_str.upper().startswith("CWE-"):
129-
try:
130-
weaknesses.append(get_cwe_id(cwe_str))
131-
except Exception:
132-
pass
133-
134-
advisory_url = (
135-
f"https://www.libreoffice.org/about-us/security/advisories/{advisory_id.lower()}/"
98+
desc_m = re.search(
99+
r"Description\s*\n?\s*:\s*\n+(.*?)(?=\nCredits\b|\nReferences\b|$)",
100+
text,
101+
re.DOTALL,
136102
)
103+
description = " ".join(desc_m.group(1).split()).strip() if desc_m else ""
104+
137105
references = []
138-
for ref in cna.get("references") or []:
139-
url = ref.get("url") or ""
140-
if url:
141-
references.append(ReferenceV2(url=url))
106+
in_refs = False
107+
for tag in content.descendants:
108+
tag_name = getattr(tag, "name", None)
109+
if tag_name == "strong" and "References" in tag.get_text():
110+
in_refs = True
111+
if in_refs and tag_name == "a":
112+
href = tag.get("href", "")
113+
if href.startswith("http"):
114+
references.append(ReferenceV2(url=href))
142115

143116
return AdvisoryDataV2(
144117
advisory_id=advisory_id,
145118
aliases=[],
146-
summary=summary,
119+
summary=description or title,
147120
affected_packages=[],
148121
references=references,
149122
date_published=date_published,
150-
weaknesses=weaknesses,
151-
severities=severities,
152-
url=advisory_url,
153-
original_advisory_text=json.dumps(data, indent=2, ensure_ascii=False),
123+
weaknesses=[],
124+
severities=[],
125+
url=url,
126+
original_advisory_text=str(content),
154127
)
128+
129+
130+
def _get_field(text: str, label: str) -> str:
131+
m = re.search(rf"{re.escape(label)}\s*:\s*\n?\s*([^\n]+)", text)
132+
return m.group(1).strip() if m else ""

0 commit comments

Comments
 (0)