Skip to content

Commit b9471ce

Browse files
committed
Migrate Alpine importer to advisory V2
Signed-off-by: ziad hany <ziadhany2016@gmail.com>
1 parent df91a2c commit b9471ce

File tree

4 files changed

+1398
-0
lines changed

4 files changed

+1398
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
from vulnerabilities.pipelines import nvd_importer
4242
from vulnerabilities.pipelines import pypa_importer
4343
from vulnerabilities.pipelines import pysec_importer
44+
from vulnerabilities.pipelines.v2_importers import alpine_linux_importer as alpine_linux_importer_v2
4445
from vulnerabilities.pipelines.v2_importers import aosp_importer as aosp_importer_v2
4546
from vulnerabilities.pipelines.v2_importers import apache_httpd_importer as apache_httpd_v2
4647
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
@@ -90,6 +91,7 @@
9091
ruby_importer_v2.RubyImporterPipeline,
9192
epss_importer_v2.EPSSImporterPipeline,
9293
mattermost_importer_v2.MattermostImporterPipeline,
94+
alpine_linux_importer_v2.AlpineLinuxImporterPipeline,
9395
nvd_importer.NVDImporterPipeline,
9496
github_importer.GitHubAPIImporterPipeline,
9597
gitlab_importer.GitLabImporterPipeline,
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
from typing import Any
12+
from typing import Iterable
13+
from typing import List
14+
from typing import Mapping
15+
from urllib.parse import urljoin
16+
17+
from bs4 import BeautifulSoup
18+
from packageurl import PackageURL
19+
from univers.version_range import AlpineLinuxVersionRange
20+
from univers.versions import InvalidVersion
21+
22+
from vulnerabilities.importer import AdvisoryData
23+
from vulnerabilities.importer import AffectedPackageV2
24+
from vulnerabilities.importer import ReferenceV2
25+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipeline
26+
from vulnerabilities.references import WireSharkReferenceV2
27+
from vulnerabilities.references import XsaReferenceV2
28+
from vulnerabilities.references import ZbxReferenceV2
29+
from vulnerabilities.utils import fetch_response
30+
31+
32+
class AlpineLinuxImporterPipeline(VulnerableCodeBaseImporterPipeline):
33+
"""Collect Alpine Linux advisories."""
34+
35+
pipeline_id = "alpine_linux_importer_v2"
36+
37+
spdx_license_expression = "CC-BY-SA-4.0"
38+
license_url = "https://secdb.alpinelinux.org/license.txt"
39+
url = "https://secdb.alpinelinux.org/"
40+
importer_name = "Alpine Linux Importer"
41+
42+
@classmethod
43+
def steps(cls):
44+
return (cls.collect_and_store_advisories,)
45+
46+
def advisories_count(self) -> int:
47+
return 0
48+
49+
def collect_advisories(self) -> Iterable[AdvisoryData]:
50+
page_response_content = fetch_response(self.url).content
51+
advisory_directory_links = fetch_advisory_directory_links(
52+
page_response_content, self.url, self.log
53+
)
54+
advisory_links = []
55+
for advisory_directory_link in advisory_directory_links:
56+
advisory_directory_page = fetch_response(advisory_directory_link).content
57+
advisory_links.extend(
58+
fetch_advisory_links(advisory_directory_page, advisory_directory_link, self.log)
59+
)
60+
for link in advisory_links:
61+
record = fetch_response(link).json()
62+
if not record["packages"]:
63+
self.log(
64+
f'"packages" not found in {link!r}',
65+
level=logging.DEBUG,
66+
)
67+
continue
68+
yield from process_record(record=record, url=link, logger=self.log)
69+
70+
71+
def fetch_advisory_directory_links(
72+
page_response_content: str,
73+
base_url: str,
74+
logger: callable = None,
75+
) -> List[str]:
76+
"""
77+
Return a list of advisory directory links present in `page_response_content` html string
78+
"""
79+
index_page = BeautifulSoup(page_response_content, features="lxml")
80+
alpine_versions = [
81+
link.text
82+
for link in index_page.find_all("a")
83+
if link.text.startswith("v") or link.text.startswith("edge")
84+
]
85+
86+
if not alpine_versions:
87+
if logger:
88+
logger(
89+
f"No versions found in {base_url!r}",
90+
level=logging.DEBUG,
91+
)
92+
return []
93+
94+
advisory_directory_links = [urljoin(base_url, version) for version in alpine_versions]
95+
96+
return advisory_directory_links
97+
98+
99+
def fetch_advisory_links(
100+
advisory_directory_page: str,
101+
advisory_directory_link: str,
102+
logger: callable = None,
103+
) -> Iterable[str]:
104+
"""
105+
Yield json file urls present in `advisory_directory_page`
106+
"""
107+
advisory_directory_page = BeautifulSoup(advisory_directory_page, features="lxml")
108+
anchor_tags = advisory_directory_page.find_all("a")
109+
if not anchor_tags:
110+
if logger:
111+
logger(
112+
f"No anchor tags found in {advisory_directory_link!r}",
113+
level=logging.DEBUG,
114+
)
115+
return iter([])
116+
for anchor_tag in anchor_tags:
117+
if anchor_tag.text.endswith("json"):
118+
yield urljoin(advisory_directory_link, anchor_tag.text)
119+
120+
121+
def check_for_attributes(record, logger) -> bool:
122+
attributes = ["distroversion", "reponame", "archs"]
123+
for attribute in attributes:
124+
if attribute not in record:
125+
if logger:
126+
logger(
127+
f'"{attribute!r}" not found in {record!r}',
128+
level=logging.DEBUG,
129+
)
130+
return False
131+
return True
132+
133+
134+
def process_record(record: dict, url: str, logger: callable = None) -> Iterable[AdvisoryData]:
135+
"""
136+
Return a list of AdvisoryData objects by processing data
137+
present in that `record`
138+
"""
139+
if not record.get("packages"):
140+
if logger:
141+
logger(
142+
f'"packages" not found in this record {record!r}',
143+
level=logging.DEBUG,
144+
)
145+
return []
146+
147+
for package in record["packages"]:
148+
if not package["pkg"]:
149+
if logger:
150+
logger(
151+
f'"pkg" not found in this package {package!r}',
152+
level=logging.DEBUG,
153+
)
154+
continue
155+
if not check_for_attributes(record, logger):
156+
continue
157+
yield from load_advisories(
158+
pkg_infos=package["pkg"],
159+
distroversion=record["distroversion"],
160+
reponame=record["reponame"],
161+
archs=record["archs"],
162+
url=url,
163+
logger=logger,
164+
)
165+
166+
167+
def load_advisories(
168+
pkg_infos: Mapping[str, Any],
169+
distroversion: str,
170+
reponame: str,
171+
archs: List[str],
172+
url: str,
173+
logger: callable = None,
174+
) -> Iterable[AdvisoryData]:
175+
"""
176+
Yield AdvisoryData by mapping data from `pkg_infos`
177+
and form PURL for AffectedPackages by using
178+
`distroversion`, `reponame`, `archs`
179+
"""
180+
if not pkg_infos.get("name"):
181+
if logger:
182+
logger(
183+
f'"name" is not available in package {pkg_infos!r}',
184+
level=logging.DEBUG,
185+
)
186+
return []
187+
188+
for version, fixed_vulns in pkg_infos["secfixes"].items():
189+
if not fixed_vulns:
190+
if logger:
191+
logger(
192+
f"No fixed vulnerabilities in version {version!r}",
193+
level=logging.DEBUG,
194+
)
195+
continue
196+
# fixed_vulns is a list of strings and each string is a space-separated
197+
# list of aliases and CVES
198+
for vuln_ids in fixed_vulns:
199+
if not isinstance(vuln_ids, str):
200+
if logger:
201+
logger(
202+
f"{vuln_ids!r} is not of `str` instance",
203+
level=logging.DEBUG,
204+
)
205+
continue
206+
vuln_ids = vuln_ids.strip().split()
207+
if not vuln_ids:
208+
if logger:
209+
logger(
210+
f"{vuln_ids!r} is empty",
211+
level=logging.DEBUG,
212+
)
213+
continue
214+
aliases = vuln_ids
215+
216+
references = []
217+
for reference_id in vuln_ids:
218+
if reference_id.startswith("XSA"):
219+
references.append(XsaReferenceV2.from_id(xsa_id=reference_id))
220+
221+
elif reference_id.startswith("ZBX"):
222+
references.append(ZbxReferenceV2.from_id(zbx_id=reference_id))
223+
224+
elif reference_id.startswith("wnpa-sec"):
225+
references.append(WireSharkReferenceV2.from_id(wnpa_sec_id=reference_id))
226+
227+
elif reference_id.startswith("CVE"):
228+
references.append(
229+
ReferenceV2(
230+
reference_id=reference_id,
231+
url=f"https://nvd.nist.gov/vuln/detail/{reference_id}",
232+
)
233+
)
234+
235+
qualifiers = {
236+
"distroversion": distroversion,
237+
"reponame": reponame,
238+
}
239+
240+
affected_packages = []
241+
242+
fixed_version_range = None
243+
try:
244+
fixed_version_range = AlpineLinuxVersionRange.from_versions([version])
245+
except InvalidVersion as e:
246+
if logger:
247+
logger(
248+
f"{version!r} is not a valid AlpineVersion {e!r}",
249+
level=logging.DEBUG,
250+
)
251+
252+
if not isinstance(archs, List):
253+
if logger:
254+
logger(
255+
f"{archs!r} is not of `List` instance",
256+
level=logging.DEBUG,
257+
)
258+
continue
259+
260+
if archs and fixed_version_range:
261+
for arch in archs:
262+
qualifiers["arch"] = arch
263+
affected_packages.append(
264+
AffectedPackageV2(
265+
package=PackageURL(
266+
type="apk",
267+
namespace="alpine",
268+
name=pkg_infos["name"],
269+
qualifiers=qualifiers,
270+
),
271+
fixed_version_range=fixed_version_range,
272+
)
273+
)
274+
275+
if not archs and fixed_version_range:
276+
# there is no arch, this is not an arch-specific package
277+
affected_packages.append(
278+
AffectedPackageV2(
279+
package=PackageURL(
280+
type="apk",
281+
namespace="alpine",
282+
name=pkg_infos["name"],
283+
qualifiers=qualifiers,
284+
),
285+
fixed_version_range=fixed_version_range,
286+
)
287+
)
288+
289+
for advisory_id in aliases:
290+
yield AdvisoryData(
291+
advisory_id=advisory_id,
292+
aliases=[],
293+
references_v2=references,
294+
affected_packages=affected_packages,
295+
url=url,
296+
)

vulnerabilities/references.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#
99

1010
from vulnerabilities.importer import Reference
11+
from vulnerabilities.importer import ReferenceV2
1112

1213

1314
class XsaReference(Reference):
@@ -75,3 +76,70 @@ def from_id(cls, wnpa_sec_id):
7576
reference_id=wnpa_sec_id,
7677
url=f"https://www.wireshark.org/security/{wnpa_sec_id}.html",
7778
)
79+
80+
81+
class XsaReferenceV2:
82+
"""
83+
A Xen advisory reference. See https://xenbits.xen.org/xsa
84+
"""
85+
86+
@classmethod
87+
def from_id(cls, xsa_id):
88+
"""
89+
Return a new XsaReference from an XSA-XXXX id.
90+
"""
91+
if not xsa_id or not xsa_id.lower().startswith("xsa"):
92+
return ValueError(f"Not a Xen reference. Does not start with XSA: {xsa_id!r}")
93+
_, numid = xsa_id.rsplit("-")
94+
return ReferenceV2(
95+
reference_id=xsa_id,
96+
url=f"https://xenbits.xen.org/xsa/advisory-{numid}.html",
97+
)
98+
99+
@classmethod
100+
def from_number(cls, number):
101+
"""
102+
Return a new XsaReference from an XSA number.
103+
"""
104+
return ReferenceV2(
105+
reference_id=f"XSA-{number}",
106+
url=f"https://xenbits.xen.org/xsa/advisory-{number}.html",
107+
)
108+
109+
110+
class ZbxReferenceV2:
111+
"""
112+
A Zabbix advisory reference. See https://support.zabbix.com
113+
"""
114+
115+
@classmethod
116+
def from_id(cls, zbx_id):
117+
"""
118+
Return a new ZbxReference from an ZBX-XXXX id.
119+
"""
120+
if not zbx_id or not zbx_id.lower().startswith("zbx"):
121+
return ValueError(f"Not a Zabbix reference. Does not start with ZBX: {zbx_id!r}")
122+
return ReferenceV2(
123+
reference_id=zbx_id,
124+
url=f"https://support.zabbix.com/browse/{zbx_id}",
125+
)
126+
127+
128+
class WireSharkReferenceV2:
129+
"""
130+
A Wireshark advisory reference. See https://www.wireshark.org/security
131+
"""
132+
133+
@classmethod
134+
def from_id(cls, wnpa_sec_id):
135+
"""
136+
Return a new WireSharkReference from an wnpa-sec-XXXX id.
137+
"""
138+
if not wnpa_sec_id or not wnpa_sec_id.lower().startswith("wnpa-sec"):
139+
return ValueError(
140+
f"Not a WireShark reference. Does not start with wnpa-sec: {wnpa_sec_id!r}"
141+
)
142+
return ReferenceV2(
143+
reference_id=wnpa_sec_id,
144+
url=f"https://www.wireshark.org/security/{wnpa_sec_id}.html",
145+
)

0 commit comments

Comments
 (0)