Skip to content

Commit c66d400

Browse files
committed
Add tests for V2 Importer Pipeline
Signed-off-by: Tushar Goel <tushar.goel.dav@gmail.com>
1 parent d1e8c54 commit c66d400

File tree

1 file changed

+180
-0
lines changed

1 file changed

+180
-0
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import logging
11+
from datetime import datetime
12+
from datetime import timedelta
13+
from unittest import mock
14+
15+
import pytest
16+
from packageurl import PackageURL
17+
18+
from vulnerabilities.importer import AdvisoryData
19+
from vulnerabilities.importer import UnMergeablePackageError
20+
from vulnerabilities.models import AdvisoryV2
21+
from vulnerabilities.models import PackageV2
22+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
23+
24+
25+
class DummyImporter(VulnerableCodeBaseImporterPipelineV2):
26+
pipeline_id = "dummy"
27+
log_messages = []
28+
29+
def log(self, message, level=logging.INFO):
30+
self.log_messages.append((level, message))
31+
32+
def collect_advisories(self):
33+
yield from self._advisories
34+
35+
def advisories_count(self):
36+
return len(self._advisories)
37+
38+
39+
@pytest.fixture
40+
def dummy_advisory():
41+
return AdvisoryData(
42+
summary="Test advisory",
43+
aliases=["CVE-2025-0001"],
44+
references_v2=[],
45+
severities=[],
46+
weaknesses=[],
47+
affected_packages=[],
48+
advisory_id="ADV-123",
49+
date_published=datetime.now() - timedelta(days=10),
50+
url="https://example.com/advisory/1",
51+
)
52+
53+
54+
@pytest.fixture
55+
def dummy_importer(dummy_advisory):
56+
importer = DummyImporter()
57+
importer._advisories = [dummy_advisory]
58+
return importer
59+
60+
61+
@pytest.mark.django_db
62+
def test_collect_and_store_advisories(dummy_importer):
63+
dummy_importer.collect_and_store_advisories()
64+
assert len(dummy_importer.log_messages) >= 2
65+
assert "Successfully collected" in dummy_importer.log_messages[-1][1]
66+
assert AdvisoryV2.objects.count() == 1
67+
68+
69+
def test_get_advisory_packages_basic(dummy_importer):
70+
purl = PackageURL("pypi", None, "dummy", "1.0.0")
71+
affected_package = mock.Mock()
72+
affected_package.package = purl
73+
dummy_importer.unfurl_version_ranges = False
74+
75+
with mock.patch(
76+
"vulnerabilities.improvers.default.get_exact_purls", return_value=([purl], [purl])
77+
):
78+
with mock.patch.object(
79+
PackageV2.objects, "get_or_create_from_purl", return_value=(mock.Mock(), True)
80+
) as mock_get:
81+
dummy_importer.get_advisory_packages(
82+
advisory_data=mock.Mock(affected_packages=[affected_package])
83+
)
84+
assert mock_get.call_count == 2 # one affected, one fixed
85+
86+
87+
def test_get_published_package_versions_filters(dummy_importer):
88+
purl = PackageURL("pypi", None, "example", None)
89+
90+
dummy_versions = [
91+
mock.Mock(value="1.0.0", release_date=datetime.now() - timedelta(days=5)),
92+
mock.Mock(value="2.0.0", release_date=datetime.now() + timedelta(days=5)), # future
93+
]
94+
95+
with mock.patch(
96+
"vulnerabilities.pipelines.package_versions.versions", return_value=dummy_versions
97+
):
98+
versions = dummy_importer.get_published_package_versions(purl, until=datetime.now())
99+
assert "1.0.0" in versions
100+
assert "2.0.0" not in versions
101+
102+
103+
def test_get_published_package_versions_failure_logs(dummy_importer):
104+
purl = PackageURL("pypi", None, "example", None)
105+
with mock.patch(
106+
"vulnerabilities.pipelines.package_versions.versions", side_effect=Exception("fail")
107+
):
108+
versions = dummy_importer.get_published_package_versions(purl)
109+
assert versions == []
110+
assert any("Failed to fetch versions" in msg for lvl, msg in dummy_importer.log_messages)
111+
112+
113+
def test_expand_version_range_to_purls(dummy_importer):
114+
purls = list(
115+
dummy_importer.expand_verion_range_to_purls("npm", "lodash", "lodash", ["1.0.0", "1.1.0"])
116+
)
117+
assert all(isinstance(p, PackageURL) for p in purls)
118+
assert purls[0].name == "lodash"
119+
120+
121+
def test_resolve_package_versions(dummy_importer):
122+
dummy_importer.ignorable_versions = []
123+
dummy_importer.expand_verion_range_to_purls = lambda *args, **kwargs: [
124+
PackageURL("npm", None, "a", "1.0.0")
125+
]
126+
127+
with mock.patch(
128+
"vulnerabilities.pipelines.resolve_version_range", return_value=(["1.0.0"], ["1.1.0"])
129+
), mock.patch(
130+
"vulnerabilities.pipelines.get_affected_packages_by_patched_package",
131+
return_value={None: [PackageURL("npm", None, "a", "1.0.0")]},
132+
), mock.patch(
133+
"vulnerabilities.pipelines.nearest_patched_package", return_value=[]
134+
):
135+
aff, fix = dummy_importer.resolve_package_versions(
136+
affected_version_range=">=1.0.0",
137+
pkg_type="npm",
138+
pkg_namespace=None,
139+
pkg_name="a",
140+
valid_versions=["1.0.0", "1.1.0"],
141+
)
142+
assert any(isinstance(p, PackageURL) for p in aff)
143+
144+
145+
def test_get_impacted_packages_mergeable(dummy_importer):
146+
ap = mock.Mock()
147+
ap.package = PackageURL("npm", None, "abc", None)
148+
dummy_importer.get_published_package_versions = lambda package_url, until: ["1.0.0", "1.1.0"]
149+
dummy_importer.resolve_package_versions = lambda **kwargs: (
150+
[PackageURL("npm", None, "abc", "1.0.0")],
151+
[PackageURL("npm", None, "abc", "1.1.0")],
152+
)
153+
154+
with mock.patch(
155+
"vulnerabilities.importer.AffectedPackage.merge",
156+
return_value=(ap.package, [">=1.0.0"], ["1.1.0"]),
157+
):
158+
aff, fix = dummy_importer.get_impacted_packages([ap], datetime.now())
159+
assert len(aff) == 1 and aff[0].version == "1.0.0"
160+
assert len(fix) == 1 and fix[0].version == "1.1.0"
161+
162+
163+
def test_get_impacted_packages_unmergeable(dummy_importer):
164+
ap = mock.Mock()
165+
ap.package = PackageURL("npm", None, "abc", None)
166+
ap.affected_version_range = ">=1.0.0"
167+
ap.fixed_version = None
168+
169+
dummy_importer.get_published_package_versions = lambda package_url, until: ["1.0.0", "1.1.0"]
170+
dummy_importer.resolve_package_versions = lambda **kwargs: (
171+
[PackageURL("npm", None, "abc", "1.0.0")],
172+
[PackageURL("npm", None, "abc", "1.1.0")],
173+
)
174+
175+
with mock.patch(
176+
"vulnerabilities.importer.AffectedPackage.merge", side_effect=UnMergeablePackageError
177+
):
178+
aff, fix = dummy_importer.get_impacted_packages([ap], datetime.utcnow())
179+
assert len(aff) == 1
180+
assert aff[0].version == "1.0.0"

0 commit comments

Comments
 (0)