Skip to content

Commit b91f43f

Browse files
committed
Add tests for github importer pipeline
Signed-off-by: Tushar Goel <tushar.goel.dav@gmail.com>
1 parent 1a879f9 commit b91f43f

File tree

1 file changed

+185
-0
lines changed

1 file changed

+185
-0
lines changed
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import pytest
2+
from unittest.mock import patch, MagicMock
3+
from datetime import datetime
4+
5+
from vulnerabilities.importer import AdvisoryData, AffectedPackage, Reference, VulnerabilitySeverity
6+
from vulnerabilities.utils import get_item
7+
from packageurl import PackageURL
8+
from univers.version_constraint import VersionConstraint
9+
from univers.version_range import RANGE_CLASS_BY_SCHEMES
10+
from univers.versions import SemverVersion
11+
from vulnerabilities.pipelines.v2_importers.github_importer import GitHubAPIImporterPipeline
12+
from vulnerabilities.pipelines.v2_importers.github_importer import get_cwes_from_github_advisory, get_purl
13+
14+
@pytest.fixture
15+
def mock_fetch():
16+
with patch("vulnerabilities.pipelines.v2_importers.github_importer.utils.fetch_github_graphql_query") as mock:
17+
yield mock
18+
19+
20+
def test_advisories_count(mock_fetch):
21+
# Mock the GraphQL query response for advisory count
22+
mock_fetch.return_value = {
23+
"data": {
24+
"securityVulnerabilities": {
25+
"totalCount": 10
26+
}
27+
}
28+
}
29+
30+
pipeline = GitHubAPIImporterPipeline()
31+
32+
count = pipeline.advisories_count()
33+
34+
# Assert that the count is correct
35+
assert count == 10
36+
37+
38+
def test_collect_advisories(mock_fetch):
39+
# Mock advisory data for GitHub
40+
advisory_data = {
41+
"data": {
42+
"securityVulnerabilities": {
43+
"edges": [
44+
{
45+
"node": {
46+
"advisory": {
47+
"identifiers": [
48+
{"type": "GHSA", "value": "GHSA-1234-ABCD"}
49+
],
50+
"summary": "Sample advisory description",
51+
"references": [{"url": "https://github.com/advisories/GHSA-1234-ABCD"}],
52+
"severity": "HIGH",
53+
"cwes": {
54+
"nodes": [{"cweId": "CWE-123"}]
55+
},
56+
"publishedAt": "2023-01-01T00:00:00Z"
57+
},
58+
"firstPatchedVersion": {"identifier": "1.2.3"},
59+
"package": {"name": "example-package"},
60+
"vulnerableVersionRange": ">=1.0.0,<=1.2.0"
61+
}
62+
}
63+
],
64+
"pageInfo": {
65+
"hasNextPage": False,
66+
"endCursor": None
67+
}
68+
}
69+
}
70+
}
71+
72+
# Mock the response from GitHub GraphQL query
73+
mock_fetch.return_value = advisory_data
74+
75+
# Instantiate the pipeline
76+
pipeline = GitHubAPIImporterPipeline()
77+
78+
# Collect advisories
79+
advisories = list(pipeline.collect_advisories())
80+
81+
# Check if advisories were correctly parsed
82+
assert len(advisories) == 1
83+
advisory = advisories[0]
84+
85+
# Validate advisory fields
86+
assert advisory.advisory_id == "GHSA-1234-ABCD"
87+
assert advisory.summary == "Sample advisory description"
88+
assert advisory.url == "https://github.com/advisories/GHSA-1234-ABCD"
89+
assert len(advisory.references_v2) == 1
90+
assert advisory.references_v2[0].reference_id == "GHSA-1234-ABCD"
91+
assert advisory.severities[0].value == "HIGH"
92+
93+
# Validate affected package and version range
94+
affected_package = advisory.affected_packages[0]
95+
assert isinstance(affected_package.package, PackageURL)
96+
assert affected_package.package.name == "example-package"
97+
98+
# Check CWE extraction
99+
assert advisory.weaknesses == [123]
100+
101+
102+
def test_get_purl(mock_fetch):
103+
# Test for package URL generation
104+
result = get_purl("cargo", "example/package-name")
105+
106+
# Validate that the correct PackageURL is generated
107+
assert isinstance(result, PackageURL)
108+
assert result.type == "cargo"
109+
assert result.namespace == None
110+
assert result.name == "example/package-name"
111+
112+
113+
def test_process_response(mock_fetch):
114+
# Mock advisory data as input for the process_response function
115+
advisory_data = {
116+
"data": {
117+
"securityVulnerabilities": {
118+
"edges": [
119+
{
120+
"node": {
121+
"advisory": {
122+
"identifiers": [
123+
{"type": "GHSA", "value": "GHSA-5678-EFGH"}
124+
],
125+
"summary": "Another advisory",
126+
"references": [{"url": "https://github.com/advisories/GHSA-5678-EFGH"}],
127+
"severity": "MEDIUM",
128+
"cwes": {
129+
"nodes": [{"cweId": "CWE-200"}]
130+
},
131+
"publishedAt": "2023-02-01T00:00:00Z"
132+
},
133+
"firstPatchedVersion": {"identifier": "2.0.0"},
134+
"package": {"name": "another-package"},
135+
"vulnerableVersionRange": ">=2.0.0,<=3.0.0"
136+
}
137+
}
138+
],
139+
"pageInfo": {
140+
"hasNextPage": False,
141+
"endCursor": None
142+
}
143+
}
144+
}
145+
}
146+
147+
# Mock the response from GitHub GraphQL query
148+
mock_fetch.return_value = advisory_data
149+
150+
# Process the mock response
151+
result = list(GitHubAPIImporterPipeline().collect_advisories())
152+
153+
# Check the results
154+
assert len(result) == 1
155+
advisory = result[0]
156+
157+
# Validate the advisory data
158+
assert advisory.advisory_id == "GHSA-5678-EFGH"
159+
assert advisory.summary == "Another advisory"
160+
assert advisory.url == "https://github.com/advisories/GHSA-5678-EFGH"
161+
162+
# Check CWE extraction
163+
assert advisory.weaknesses == [200]
164+
165+
166+
def test_get_cwes_from_github_advisory(mock_fetch):
167+
# Mock CWEs extraction from GitHub advisory
168+
advisory_data = {
169+
"cwes": {
170+
"nodes": [{"cweId": "CWE-522"}]
171+
}
172+
}
173+
174+
cwes = get_cwes_from_github_advisory(advisory_data)
175+
176+
# Validate the CWE ID extraction
177+
assert cwes == [522]
178+
179+
180+
def test_invalid_package_type_in_get_purl(mock_fetch):
181+
# Test for invalid package type
182+
result = get_purl("invalidpkg", "example/package-name")
183+
184+
# Assert that None is returned for an invalid package type
185+
assert result is None

0 commit comments

Comments
 (0)