|
| 1 | +import pytest |
| 2 | +import requests |
| 3 | + |
| 4 | +from vulnerabilities.importer import AdvisoryData |
| 5 | +from vulnerabilities.pipelines.v2_importers.apache_httpd_importer import ApacheHTTPDImporterPipeline |
| 6 | +from vulnerabilities.pipelines.v2_importers.apache_httpd_importer import fetch_links |
| 7 | +from vulnerabilities.pipelines.v2_importers.apache_httpd_importer import get_weaknesses |
| 8 | +from vulnerabilities.severity_systems import APACHE_HTTPD |
| 9 | + |
| 10 | + |
| 11 | +# Dummy responses |
| 12 | +class DummyResponseContent: |
| 13 | + def __init__(self, content_bytes): |
| 14 | + self.content = content_bytes |
| 15 | + |
| 16 | + |
| 17 | +class DummyResponseJSON: |
| 18 | + def __init__(self, json_data): |
| 19 | + self._json = json_data |
| 20 | + |
| 21 | + def json(self): |
| 22 | + return self._json |
| 23 | + |
| 24 | + |
| 25 | +# Tests for fetch_links |
| 26 | +@pytest.fixture(autouse=True) |
| 27 | +def no_requests(monkeypatch): |
| 28 | + # Ensure other tests don't hit real HTTP |
| 29 | + monkeypatch.setattr( |
| 30 | + requests, |
| 31 | + "get", |
| 32 | + lambda url: (_ for _ in ()).throw(AssertionError(f"Unexpected HTTP GET call to {url}")), |
| 33 | + ) |
| 34 | + |
| 35 | + |
| 36 | +def test_fetch_links_filters_and_resolves(monkeypatch): |
| 37 | + html = """ |
| 38 | + <html><body> |
| 39 | + <a href="advisory1.json">A1</a> |
| 40 | + <a href="/json/advisory2.json">A2</a> |
| 41 | + <a href="readme.txt">TXT</a> |
| 42 | + </body></html> |
| 43 | + """ |
| 44 | + base_url = "https://example.com/base/" |
| 45 | + # Monkeypatch HTTP GET for HTML |
| 46 | + def fake_get(url): |
| 47 | + assert url == base_url |
| 48 | + return DummyResponseContent(html.encode("utf-8")) |
| 49 | + |
| 50 | + monkeypatch.setattr(requests, "get", fake_get) |
| 51 | + links = fetch_links(base_url) |
| 52 | + assert len(links) == 2 |
| 53 | + assert links == [ |
| 54 | + "https://example.com/base/advisory1.json", |
| 55 | + "https://example.com/json/advisory2.json", |
| 56 | + ] |
| 57 | + |
| 58 | + |
| 59 | +# Tests for get_weaknesses |
| 60 | +def test_get_weaknesses_with_cna_structure(): |
| 61 | + mock_data = { |
| 62 | + "containers": {"cna": {"problemTypes": [{"descriptions": [{"cweId": "CWE-125"}]}]}} |
| 63 | + } |
| 64 | + result = get_weaknesses(mock_data) |
| 65 | + assert result == [125] |
| 66 | + |
| 67 | + |
| 68 | +def test_get_weaknesses_with_data_meta_structure(): |
| 69 | + mock_data = { |
| 70 | + "CVE_data_meta": {"ID": "CVE-2020-0001"}, |
| 71 | + "problemtype": { |
| 72 | + "problemtype_data": [ |
| 73 | + {"description": [{"value": "CWE-190 Integer Overflow"}]}, |
| 74 | + {"description": [{"value": "CWE-200 Some Issue"}]}, |
| 75 | + ] |
| 76 | + }, |
| 77 | + } |
| 78 | + result = get_weaknesses(mock_data) |
| 79 | + assert set(result) == {190, 200} |
| 80 | + |
| 81 | + |
| 82 | +# Tests for ApacheHTTPDImporterPipeline |
| 83 | +class DummyPipeline(ApacheHTTPDImporterPipeline): |
| 84 | + # Expose protected methods for testing |
| 85 | + pass |
| 86 | + |
| 87 | + |
| 88 | +@pytest.fixture |
| 89 | +def pipeline(monkeypatch): |
| 90 | + pipe = DummyPipeline() |
| 91 | + # Prevent real HTTP in fetch_links |
| 92 | + monkeypatch.setattr( |
| 93 | + "vulnerabilities.pipelines.v2_importers.apache_httpd_importer.fetch_links", lambda url: ["u1", "u2"] |
| 94 | + ) |
| 95 | + return pipe |
| 96 | + |
| 97 | + |
| 98 | +def test_advisories_count(monkeypatch, pipeline): |
| 99 | + # Should use mocked links |
| 100 | + count = pipeline.advisories_count() |
| 101 | + assert count == 2 |
| 102 | + |
| 103 | + |
| 104 | +def test_collect_advisories_and_to_advisory(monkeypatch, pipeline): |
| 105 | + # Prepare two dummy JSONs |
| 106 | + sample1 = { |
| 107 | + "CVE_data_meta": {"ID": "CVE-1"}, |
| 108 | + "description": {"description_data": [{"lang": "eng", "value": "Test desc"}]}, |
| 109 | + "impact": [{"other": "5.0"}], |
| 110 | + "affects": {"vendor": {"vendor_data": []}}, |
| 111 | + "timeline": [], |
| 112 | + } |
| 113 | + sample2 = { |
| 114 | + "cveMetadata": {"cveId": "CVE-2"}, |
| 115 | + "description": {"description_data": [{"lang": "eng", "value": "Other desc"}]}, |
| 116 | + "impact": [{"other": "7.5"}], |
| 117 | + "affects": {"vendor": {"vendor_data": []}}, |
| 118 | + "timeline": [], |
| 119 | + } |
| 120 | + # Monkeypatch requests.get to return JSON |
| 121 | + def fake_get(u): |
| 122 | + if u == "u1": |
| 123 | + return DummyResponseJSON(sample1) |
| 124 | + elif u == "u2": |
| 125 | + return DummyResponseJSON(sample2) |
| 126 | + else: |
| 127 | + raise AssertionError(f"Unexpected URL {u}") |
| 128 | + |
| 129 | + monkeypatch.setattr(requests, "get", fake_get) |
| 130 | + advisories = list(pipeline.collect_advisories()) |
| 131 | + assert len(advisories) == 2 |
| 132 | + # Validate first advisory |
| 133 | + adv1 = advisories[0] |
| 134 | + assert isinstance(adv1, AdvisoryData) |
| 135 | + assert adv1.advisory_id == "CVE-1" |
| 136 | + assert adv1.summary == "Test desc" |
| 137 | + assert adv1.severities and adv1.severities[0].value == "5.0" |
| 138 | + assert adv1.url.endswith("CVE-1.json") |
| 139 | + # Validate second advisory |
| 140 | + adv2 = advisories[1] |
| 141 | + assert adv2.advisory_id == "CVE-2" |
| 142 | + assert adv2.summary == "Other desc" |
| 143 | + assert adv2.severities[0].value == "7.5" |
| 144 | + |
| 145 | + |
| 146 | +# Test version range conversion error |
| 147 | +def test_to_version_ranges_unknown_comparator(pipeline): |
| 148 | + # version_data with bad comparator |
| 149 | + versions_data = [{"version_value": "1.0.0", "version_affected": "<>"}] |
| 150 | + fixed_versions = [] |
| 151 | + with pytest.raises(ValueError): |
| 152 | + pipeline.to_version_ranges(versions_data, fixed_versions) |
0 commit comments