|
| 1 | +import json |
| 2 | +from types import SimpleNamespace |
| 3 | + |
| 4 | +import pytz |
| 5 | +from packageurl import PackageURL |
| 6 | +from univers.version_range import NpmVersionRange |
| 7 | +from univers.versions import SemverVersion |
| 8 | + |
| 9 | +from vulnerabilities.importer import AdvisoryData |
| 10 | +from vulnerabilities.pipelines.v2_importers.npm_importer import NpmImporterPipeline |
| 11 | +from vulnerabilities.severity_systems import CVSSV2 |
| 12 | +from vulnerabilities.severity_systems import CVSSV3 |
| 13 | + |
| 14 | + |
| 15 | +def test_clone(monkeypatch): |
| 16 | + import vulnerabilities.pipelines.v2_importers.npm_importer as npm_mod |
| 17 | + |
| 18 | + dummy = SimpleNamespace(dest_dir="dummy", delete=lambda: None) |
| 19 | + # Patch the name in the npm_importer module, not fetchcode.vcs |
| 20 | + monkeypatch.setattr(npm_mod, "fetch_via_vcs", lambda url: dummy) |
| 21 | + |
| 22 | + p = NpmImporterPipeline() |
| 23 | + p.clone() |
| 24 | + |
| 25 | + assert p.vcs_response is dummy |
| 26 | + |
| 27 | + |
| 28 | +def test_clean_downloads_and_on_failure(): |
| 29 | + called = {} |
| 30 | + |
| 31 | + def delete(): |
| 32 | + called["deleted"] = True |
| 33 | + |
| 34 | + dummy = SimpleNamespace(dest_dir="dummy", delete=delete) |
| 35 | + p = NpmImporterPipeline() |
| 36 | + p.vcs_response = dummy |
| 37 | + p.clean_downloads() |
| 38 | + assert called.get("deleted", False) |
| 39 | + called.clear() |
| 40 | + p.on_failure() |
| 41 | + assert called.get("deleted", False) |
| 42 | + |
| 43 | + |
| 44 | +def test_advisories_count_and_collect(tmp_path): |
| 45 | + base = tmp_path |
| 46 | + vuln_dir = base / "vuln" / "npm" |
| 47 | + vuln_dir.mkdir(parents=True) |
| 48 | + (vuln_dir / "index.json").write_text("{}") |
| 49 | + (vuln_dir / "001.json").write_text(json.dumps({"id": "001"})) |
| 50 | + p = NpmImporterPipeline() |
| 51 | + p.vcs_response = SimpleNamespace(dest_dir=str(base), delete=lambda: None) |
| 52 | + assert p.advisories_count() == 2 |
| 53 | + advisories = list(p.collect_advisories()) |
| 54 | + # Should yield None for index.json and one AdvisoryData |
| 55 | + real = [a for a in advisories if isinstance(a, AdvisoryData)] |
| 56 | + assert len(real) == 1 |
| 57 | + assert real[0].advisory_id == "NODESEC-NPM-001" |
| 58 | + |
| 59 | + |
| 60 | +def test_to_advisory_data_skips_index(tmp_path): |
| 61 | + p = NpmImporterPipeline() |
| 62 | + file = tmp_path / "index.json" |
| 63 | + file.write_text("{}") |
| 64 | + assert p.to_advisory_data(file) is None |
| 65 | + |
| 66 | + |
| 67 | +def test_to_advisory_data_full(tmp_path): |
| 68 | + data = { |
| 69 | + "id": "123", |
| 70 | + "overview": "desc", |
| 71 | + "title": "ti", |
| 72 | + "created_at": "2021-01-01T00:00:00Z", |
| 73 | + "cvss_vector": "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H", |
| 74 | + "cvss_score": "9.8", |
| 75 | + "references": ["http://ref1"], |
| 76 | + "module_name": "mypkg", |
| 77 | + "vulnerable_versions": "<=1.2.3", |
| 78 | + "patched_versions": ">=1.2.4", |
| 79 | + "cves": ["CVE-123", "CVE-124"], |
| 80 | + } |
| 81 | + file = tmp_path / "123.json" |
| 82 | + file.write_text(json.dumps(data)) |
| 83 | + p = NpmImporterPipeline() |
| 84 | + adv = p.to_advisory_data(file) |
| 85 | + assert isinstance(adv, AdvisoryData) |
| 86 | + assert adv.advisory_id == "NODESEC-NPM-123" |
| 87 | + assert "ti" in adv.summary and "desc" in adv.summary |
| 88 | + assert adv.date_published.tzinfo == pytz.UTC |
| 89 | + assert len(adv.severities) == 1 and adv.severities[0].system == CVSSV3 |
| 90 | + urls = [r.url for r in adv.references_v2] |
| 91 | + assert "http://ref1" in urls |
| 92 | + assert f"https://github.com/nodejs/security-wg/blob/main/vuln/npm/123.json" in urls |
| 93 | + pkg = adv.affected_packages[0] |
| 94 | + assert pkg.package == PackageURL(type="npm", name="mypkg") |
| 95 | + assert isinstance(pkg.affected_version_range, NpmVersionRange) |
| 96 | + assert pkg.fixed_version == SemverVersion("1.2.4") |
| 97 | + assert set(adv.aliases) == {"CVE-123", "CVE-124"} |
| 98 | + |
| 99 | + |
| 100 | +def test_to_advisory_data_cvss_v2(tmp_path): |
| 101 | + data = {"id": "124", "cvss_vector": "CVSS:2.0/AV:N/AC:L/Au:N/C:P/I:P/A:P", "cvss_score": "5.5"} |
| 102 | + file = tmp_path / "124.json" |
| 103 | + file.write_text(json.dumps(data)) |
| 104 | + p = NpmImporterPipeline() |
| 105 | + adv = p.to_advisory_data(file) |
| 106 | + assert len(adv.severities) == 1 and adv.severities[0].system == CVSSV2 |
| 107 | + |
| 108 | + |
| 109 | +def test_get_affected_package_special_and_standard(): |
| 110 | + p = NpmImporterPipeline() |
| 111 | + pkg = p.get_affected_package( |
| 112 | + {"vulnerable_versions": "<=99.999.99999", "patched_versions": "<0.0.0"}, "pkg" |
| 113 | + ) |
| 114 | + assert isinstance(pkg.affected_version_range, NpmVersionRange) |
| 115 | + assert pkg.fixed_version is None |
| 116 | + data2 = {"vulnerable_versions": "<=2.0.0", "patched_versions": ">=2.0.1"} |
| 117 | + pkg2 = p.get_affected_package(data2, "pkg2") |
| 118 | + assert isinstance(pkg2.affected_version_range, NpmVersionRange) |
| 119 | + assert pkg2.fixed_version == SemverVersion("2.0.1") |
0 commit comments