Skip to content

Commit 515784e

Browse files
committed
Modify Nginx pipeline importer to support package-first mode #1916
* Update Nginx importer to filter and process advisories relevant to the purl passed in the constructor * Update Nginx importer tests to include testing the package-first mode Signed-off-by: Michael Ehab Mikhail <michael.ehab@hotmail.com>
1 parent a05b65e commit 515784e

File tree

3 files changed

+476
-2
lines changed

3 files changed

+476
-2
lines changed

vulnerabilities/pipelines/nginx_importer.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@ class NginxImporterPipeline(VulnerableCodeBaseImporterPipeline):
3434
url = "https://nginx.org/en/security_advisories.html"
3535
importer_name = "Nginx Importer"
3636

37+
is_batch_run = True
38+
39+
def __init__(self, *args, purl=None, **kwargs):
40+
super().__init__(*args, **kwargs)
41+
self.purl = purl
42+
if self.purl:
43+
NginxImporterPipeline.is_batch_run = False
44+
if self.purl.type != "nginx":
45+
print(
46+
f"Warning: PURL type {self.purl.type} is not 'nginx', may not match any advisories"
47+
)
48+
3749
@classmethod
3850
def steps(cls):
3951
return (
@@ -57,8 +69,46 @@ def collect_advisories(self) -> Iterable[AdvisoryData]:
5769
soup = BeautifulSoup(self.advisory_data, features="lxml")
5870
vulnerability_list = soup.select("li p")
5971
for vulnerability_info in vulnerability_list:
60-
ngnix_advisory = parse_advisory_data_from_paragraph(vulnerability_info)
61-
yield to_advisory_data(ngnix_advisory)
72+
nginx_advisory = parse_advisory_data_from_paragraph(vulnerability_info)
73+
advisory_data = to_advisory_data(nginx_advisory)
74+
75+
if self.purl and not self._is_advisory_affecting_purl(advisory_data):
76+
continue
77+
78+
yield advisory_data
79+
80+
def _is_advisory_affecting_purl(self, advisory_data):
81+
if not self.purl:
82+
return True
83+
84+
if self.purl.type != "nginx":
85+
return False
86+
87+
for affected_package in advisory_data.affected_packages:
88+
if affected_package.package.type != self.purl.type:
89+
continue
90+
91+
if affected_package.package.name != self.purl.name:
92+
continue
93+
94+
if self.purl.qualifiers and "os" in self.purl.qualifiers:
95+
if (
96+
not affected_package.package.qualifiers
97+
or "os" not in affected_package.package.qualifiers
98+
or affected_package.package.qualifiers["os"] != self.purl.qualifiers["os"]
99+
):
100+
continue
101+
102+
if self.purl.version:
103+
purl_version = NginxVersion(self.purl.version)
104+
105+
affected_range = affected_package.affected_version_range
106+
if affected_range and purl_version not in affected_range:
107+
continue
108+
109+
return True
110+
111+
return False
62112

63113

64114
class NginxAdvisory(NamedTuple):

vulnerabilities/tests/pipelines/test_nginx_importer_pipeline.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from bs4 import BeautifulSoup
1616
from commoncode import testcase
1717
from django.db.models.query import QuerySet
18+
from packageurl import PackageURL
1819
from univers.version_range import NginxVersionRange
1920

2021
from vulnerabilities import models
@@ -239,3 +240,41 @@ def test_NginxBasicImprover__get_inferences_from_versions_end_to_end(self):
239240
"improver/improver-inferences-expected.json", must_exist=False
240241
)
241242
util_tests.check_results_against_json(results, expected_file)
243+
244+
def test_nginx_importer_package_first_mode_found(self):
245+
test_file = self.get_test_loc("security_advisories.html")
246+
with open(test_file) as tf:
247+
test_text = tf.read()
248+
249+
purl = PackageURL(type="nginx", name="nginx", version="1.17.2")
250+
pipeline = nginx_importer.NginxImporterPipeline(purl=purl)
251+
pipeline.advisory_data = test_text
252+
advisories = list(pipeline.collect_advisories())
253+
expected_file = self.get_test_loc("security_advisories-single-version-expected.json")
254+
advisories_dicts = [a.to_dict() for a in advisories]
255+
util_tests.check_results_against_json(advisories_dicts, expected_file)
256+
257+
def test_nginx_importer_package_first_mode_none_found(self):
258+
test_file = self.get_test_loc("security_advisories.html")
259+
with open(test_file) as tf:
260+
test_text = tf.read()
261+
262+
purl = PackageURL(type="nginx", name="nonexistent")
263+
pipeline = nginx_importer.NginxImporterPipeline(purl=purl)
264+
pipeline.advisory_data = test_text
265+
advisories = list(pipeline.collect_advisories())
266+
assert advisories == []
267+
268+
def test_nginx_importer_package_first_mode_with_os_qualifier(self):
269+
test_file = self.get_test_loc("security_advisories.html")
270+
with open(test_file) as tf:
271+
test_text = tf.read()
272+
273+
purl = PackageURL(type="nginx", name="nginx", qualifiers={"os": "windows"})
274+
pipeline = nginx_importer.NginxImporterPipeline(purl=purl)
275+
pipeline.advisory_data = test_text
276+
advisories = list(pipeline.collect_advisories())
277+
278+
for adv in advisories:
279+
for pkg in adv.affected_packages:
280+
assert pkg.package.qualifiers.get("os") == "windows"

0 commit comments

Comments
 (0)