Skip to content

Commit a7ec364

Browse files
committed
Add tests for vulnrichment importer pipeline v2
Signed-off-by: Tushar Goel <tushar.goel.dav@gmail.com>
1 parent d9580e2 commit a7ec364

File tree

1 file changed

+205
-0
lines changed

1 file changed

+205
-0
lines changed
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import json
11+
from pathlib import Path
12+
from unittest.mock import MagicMock
13+
from unittest.mock import patch
14+
15+
import pytest
16+
17+
from vulnerabilities.importer import AdvisoryData
18+
from vulnerabilities.importer import VulnerabilitySeverity
19+
from vulnerabilities.pipelines.v2_importers.vulnrichment_importer import VulnrichImporterPipeline
20+
21+
22+
@pytest.fixture
23+
def mock_vcs_response():
24+
# Mock the vcs_response from fetch_via_vcs
25+
mock_response = MagicMock()
26+
mock_response.dest_dir = "/mock/repo"
27+
mock_response.delete = MagicMock()
28+
return mock_response
29+
30+
31+
@pytest.fixture
32+
def mock_fetch_via_vcs(mock_vcs_response):
33+
with patch(
34+
"vulnerabilities.pipelines.v2_importers.vulnrichment_importer.fetch_via_vcs"
35+
) as mock:
36+
mock.return_value = mock_vcs_response
37+
yield mock
38+
39+
40+
@pytest.fixture
41+
def mock_pathlib(tmp_path):
42+
# Create a mock filesystem with a 'vulns' directory and JSON files
43+
vulns_dir = tmp_path / "vulns"
44+
vulns_dir.mkdir()
45+
46+
advisory_file = vulns_dir / "CVE-2021-1234.json"
47+
advisory_file.write_text(
48+
json.dumps(
49+
{
50+
"cveMetadata": {
51+
"cveId": "CVE-2021-1234",
52+
"state": "PUBLIC",
53+
"datePublished": "2021-01-01",
54+
},
55+
"containers": {
56+
"cna": {
57+
"descriptions": [{"lang": "en", "value": "Sample PyPI vulnerability"}],
58+
"metrics": [
59+
{
60+
"cvssV4_0": {
61+
"baseScore": 7.5,
62+
"vectorString": "AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
63+
}
64+
}
65+
],
66+
"affected": [{"cpes": ["cpe:/a:example:package"]}],
67+
"references": [{"url": "https://example.com", "tags": ["exploit"]}],
68+
}
69+
},
70+
}
71+
)
72+
)
73+
return vulns_dir
74+
75+
76+
def test_clone(mock_fetch_via_vcs, mock_vcs_response):
77+
# Test the `clone` method to ensure the repository is cloned correctly
78+
pipeline = VulnrichImporterPipeline()
79+
pipeline.clone()
80+
81+
mock_fetch_via_vcs.assert_called_once_with(pipeline.repo_url)
82+
assert pipeline.vcs_response == mock_vcs_response
83+
84+
85+
def test_advisories_count(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs):
86+
mock_vcs_response.dest_dir = str(mock_pathlib.parent)
87+
88+
pipeline = VulnrichImporterPipeline()
89+
pipeline.clone()
90+
count = pipeline.advisories_count()
91+
92+
assert count == 0
93+
94+
95+
def test_collect_advisories(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs):
96+
# Mock `vcs_response.dest_dir` to point to the temporary directory
97+
mock_vcs_response.dest_dir = str(mock_pathlib.parent)
98+
99+
# Mock `parse_cve_advisory` to return an AdvisoryData object
100+
with patch(
101+
"vulnerabilities.pipelines.v2_importers.vulnrichment_importer.VulnrichImporterPipeline.parse_cve_advisory"
102+
) as mock_parse:
103+
mock_parse.return_value = AdvisoryData(
104+
advisory_id="CVE-2021-1234",
105+
summary="Sample PyPI vulnerability",
106+
references_v2=[{"url": "https://example.com"}],
107+
affected_packages=[],
108+
weaknesses=[],
109+
url="https://example.com",
110+
severities=[
111+
VulnerabilitySeverity(
112+
system="cvssv4",
113+
value=7.5,
114+
scoring_elements="AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
115+
)
116+
],
117+
)
118+
119+
pipeline = VulnrichImporterPipeline()
120+
pipeline.clone()
121+
advisories = list(pipeline.collect_advisories())
122+
123+
# Ensure that advisories are parsed correctly
124+
assert len(advisories) == 1
125+
advisory = advisories[0]
126+
assert advisory.advisory_id == "CVE-2021-1234"
127+
assert advisory.summary == "Sample PyPI vulnerability"
128+
assert advisory.url == "https://example.com"
129+
130+
131+
def test_clean_downloads(mock_vcs_response, mock_fetch_via_vcs):
132+
# Test the `clean_downloads` method to ensure the repository is deleted
133+
pipeline = VulnrichImporterPipeline()
134+
pipeline.clone()
135+
pipeline.vcs_response = mock_vcs_response
136+
137+
pipeline.clean_downloads()
138+
139+
mock_vcs_response.delete.assert_called_once()
140+
141+
142+
def test_on_failure(mock_vcs_response, mock_fetch_via_vcs):
143+
pipeline = VulnrichImporterPipeline()
144+
pipeline.clone()
145+
pipeline.vcs_response = mock_vcs_response
146+
147+
with patch.object(pipeline, "clean_downloads") as mock_clean:
148+
pipeline.on_failure()
149+
150+
mock_clean.assert_called_once()
151+
152+
153+
def test_parse_cve_advisory(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs):
154+
from vulnerabilities.pipelines.v2_importers.vulnrichment_importer import (
155+
VulnrichImporterPipeline,
156+
)
157+
158+
mock_vcs_response.dest_dir = str(mock_pathlib.parent)
159+
160+
raw_data = {
161+
"cveMetadata": {"cveId": "CVE-2021-1234", "state": "PUBLIC", "datePublished": "2021-01-01"},
162+
"containers": {
163+
"cna": {
164+
"descriptions": [{"lang": "en", "value": "Sample PyPI vulnerability"}],
165+
"metrics": [
166+
{
167+
"cvssV4_0": {
168+
"baseScore": 7.5,
169+
"vectorString": "AV:N/AC:L/PR:L/UI:N/S:U/C:H/I:H/A:H",
170+
}
171+
}
172+
],
173+
"affected": [{"cpes": ["cpe:/a:example:package"]}],
174+
"references": [{"url": "https://example.com", "tags": ["exploit"]}],
175+
}
176+
},
177+
}
178+
advisory_url = "https://github.com/cisagov/vulnrichment/blob/develop/CVE-2021-1234.json"
179+
180+
pipeline = VulnrichImporterPipeline()
181+
pipeline.clone()
182+
advisory = pipeline.parse_cve_advisory(raw_data, advisory_url)
183+
184+
assert advisory.advisory_id == "CVE-2021-1234"
185+
assert advisory.summary == "Sample PyPI vulnerability"
186+
assert advisory.url == advisory_url
187+
assert len(advisory.severities) == 1
188+
assert advisory.severities[0].value == 7.5
189+
190+
191+
def test_collect_advisories_with_invalid_json(mock_pathlib, mock_vcs_response, mock_fetch_via_vcs):
192+
invalid_file = mock_pathlib / "invalid_file.json"
193+
invalid_file.write_text("invalid_json")
194+
195+
mock_vcs_response.dest_dir = str(mock_pathlib.parent)
196+
197+
with patch(
198+
"vulnerabilities.pipelines.v2_importers.vulnrichment_importer.VulnrichImporterPipeline.parse_cve_advisory"
199+
) as mock_parse:
200+
mock_parse.side_effect = json.JSONDecodeError("Invalid JSON", "", 0)
201+
202+
pipeline = VulnrichImporterPipeline()
203+
pipeline.clone()
204+
with pytest.raises(json.JSONDecodeError):
205+
list(pipeline.collect_advisories())

0 commit comments

Comments
 (0)