From 5fdd6574742aea87d060fcc257ddbeeb42321357 Mon Sep 17 00:00:00 2001 From: Daniel Sanz <13658011+sdn4z@users.noreply.github.com> Date: Wed, 10 Sep 2025 14:29:12 +0200 Subject: [PATCH] feat: use repo as source of truth BREAKING CHANGE --- README.md | 15 +-- src/twyn/trusted_packages/references/base.py | 38 +++--- .../references/top_npm_reference.py | 22 +--- .../references/top_pypi_reference.py | 21 +--- tests/conftest.py | 11 +- tests/main/test_main.py | 26 ---- tests/trusted_packages/test_references.py | 118 +++++++++++------- 7 files changed, 113 insertions(+), 138 deletions(-) diff --git a/README.md b/README.md index 7e0ad89f..f150fb36 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ [![Python Version](https://img.shields.io/badge/python-3.9%20%7C%203.10%20%7C%203.11%20%7C%203.12%20%7C%203.13-blue?logo=python&logoColor=yellow)](https://pypi.org/project/twyn/) [![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![License](https://img.shields.io/github/license/elementsinteractive/twyn)](LICENSE) + ## Table of Contents - [Overview](#overview) @@ -171,18 +172,14 @@ allowlist=["my_package"] source="https://mirror-with-trusted-dependencies.com/file.json" ``` -> [!WARNING] -> `twyn` will have a default reference URL for every source of trusted packages that is configurable. -> If you want to protect yourself against spoofing attacks, it is recommended to set your own -> reference url. - The file format for each reference is as follows: -- **PyPI reference**: - -```ts +```jsonc { - rows: {project: string}[] + "date": "string (ISO 8601 format, e.g. 2025-09-10T14:23:00+00)", + "packages": [ + { "name": "string" } + ] } ``` diff --git a/src/twyn/trusted_packages/references/base.py b/src/twyn/trusted_packages/references/base.py index 6200afde..90fd14f6 100644 --- a/src/twyn/trusted_packages/references/base.py +++ b/src/twyn/trusted_packages/references/base.py @@ -7,6 +7,7 @@ from twyn.trusted_packages.cache_handler import CacheEntry, CacheHandler from twyn.trusted_packages.exceptions import ( + EmptyPackagesListError, InvalidJSONError, ) @@ -28,26 +29,19 @@ def __init__(self, source: Optional[str] = None, cache_handler: Union[CacheHandl self.source = source or self.DEFAULT_SOURCE self.cache_handler = cache_handler - @staticmethod - @abstractmethod - def _parse(packages_json: dict[str, Any]) -> set[str]: - """Parse and retrieve the packages within the given json structure.""" - @staticmethod @abstractmethod def normalize_packages(packages: set[str]) -> set[str]: """Normalize package names to make sure they're valid within the package manager context.""" def _download(self) -> dict[str, Any]: - packages = requests.get(self.source) - packages.raise_for_status() + response = requests.get(self.source) + response.raise_for_status() + try: - packages_json: dict[str, Any] = packages.json() + return response.json() except requests.exceptions.JSONDecodeError as err: raise InvalidJSONError from err - else: - logger.debug("Successfully downloaded trusted packages list from %s", self.source) - return packages_json def _save_trusted_packages_to_cache_if_enabled(self, packages: set[str]) -> None: """Save trusted packages using CacheHandler.""" @@ -69,18 +63,24 @@ def _get_packages_from_cache_if_enabled(self) -> set[str]: return cache_entry.packages def get_packages(self) -> set[str]: - """Download and parse online source of top Python Package Index packages.""" - packages_to_use = set() - packages_to_use = self._get_packages_from_cache_if_enabled() + """Download and parse online source of top packages from the package ecosystem.""" + packages = self._get_packages_from_cache_if_enabled() # we don't save the cache here, we keep it as it is so the date remains the original one. - - if not packages_to_use: + if not packages: # no cache usage, no cache hit (non-existent or outdated) or cache was empty. logger.info("Fetching trusted packages from trusted packages reference...") - packages_to_use = self._parse(self._download()) + data = self._download() + try: + packages = set(data["packages"]) + except KeyError as err: + raise InvalidJSONError("`packages` key not in JSON.") from err + + logger.debug("Successfully downloaded trusted packages list from %s", self.source) + if not packages: + raise EmptyPackagesListError # New packages were downloaded, we create a new entry updating all values. - self._save_trusted_packages_to_cache_if_enabled(packages_to_use) + self._save_trusted_packages_to_cache_if_enabled(packages) - normalized_packages = self.normalize_packages(packages_to_use) + normalized_packages = self.normalize_packages(packages) return normalized_packages diff --git a/src/twyn/trusted_packages/references/top_npm_reference.py b/src/twyn/trusted_packages/references/top_npm_reference.py index 4f798cff..30105ba2 100644 --- a/src/twyn/trusted_packages/references/top_npm_reference.py +++ b/src/twyn/trusted_packages/references/top_npm_reference.py @@ -1,12 +1,9 @@ import logging import re -from typing import Any from typing_extensions import override from twyn.trusted_packages.exceptions import ( - EmptyPackagesListError, - InvalidReferenceFormatError, PackageNormalizingError, ) from twyn.trusted_packages.references.base import AbstractPackageReference @@ -17,22 +14,9 @@ class TopNpmReference(AbstractPackageReference): """Top npm packages retrieved from an online source.""" - DEFAULT_SOURCE: str = "https://www.npmleaderboard.org/api/packages" - - @override - @staticmethod - def _parse(packages_info: dict[str, Any]) -> set[str]: - try: - names = {pkg["name"] for pkg in packages_info["packages"]} - - except KeyError as err: - raise InvalidReferenceFormatError from err - - if not names: - raise EmptyPackagesListError - - logger.debug("Successfully parsed trusted packages list") - return names + DEFAULT_SOURCE: str = ( + "https://raw.githubusercontent.com/elementsinteractive/twyn/refs/heads/main/dependencies/npm.json" + ) @override @staticmethod diff --git a/src/twyn/trusted_packages/references/top_pypi_reference.py b/src/twyn/trusted_packages/references/top_pypi_reference.py index d8cfc0e1..6f1b6ca9 100644 --- a/src/twyn/trusted_packages/references/top_pypi_reference.py +++ b/src/twyn/trusted_packages/references/top_pypi_reference.py @@ -1,12 +1,9 @@ import logging import re -from typing import Any from typing_extensions import override from twyn.trusted_packages.exceptions import ( - EmptyPackagesListError, - InvalidReferenceFormatError, PackageNormalizingError, ) from twyn.trusted_packages.references.base import AbstractPackageReference @@ -17,21 +14,9 @@ class TopPyPiReference(AbstractPackageReference): """Top PyPi packages retrieved from an online source.""" - DEFAULT_SOURCE: str = "https://hugovk.github.io/top-pypi-packages/top-pypi-packages.min.json" - - @override - @staticmethod - def _parse(packages_info: dict[str, Any]) -> set[str]: - try: - names = {row["project"] for row in packages_info["rows"]} - except KeyError as err: - raise InvalidReferenceFormatError from err - - if not names: - raise EmptyPackagesListError - - logger.debug("Successfully parsed trusted packages list") - return names + DEFAULT_SOURCE: str = ( + "https://raw.githubusercontent.com/elementsinteractive/twyn/refs/heads/main/dependencies/pypi.json" + ) @override @staticmethod diff --git a/tests/conftest.py b/tests/conftest.py index a9203cdc..82a34035 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ -from collections.abc import Iterable, Iterator +import datetime +from collections.abc import Iterator from contextlib import contextmanager from pathlib import Path from unittest import mock @@ -14,12 +15,12 @@ def create_tmp_file(path: Path, data: str) -> Iterator[Path]: @contextmanager -def patch_pypi_packages_download(packages: Iterable[str]) -> Iterator[mock.Mock]: +def patch_pypi_packages_download(packages: list[str]) -> Iterator[mock.Mock]: """Patcher of `requests.get` for Top PyPi list. Replaces call with the output you would get from downloading the top PyPi packages list. """ - json_response = {"rows": [{"project": name} for name in packages]} + json_response = {"packages": packages, "date": datetime.datetime.now().isoformat()} with mock.patch("twyn.trusted_packages.TopPyPiReference._download") as mock_download: mock_download.return_value = json_response @@ -28,12 +29,12 @@ def patch_pypi_packages_download(packages: Iterable[str]) -> Iterator[mock.Mock] @contextmanager -def patch_npm_packages_download(packages: Iterable[str]) -> Iterator[mock.Mock]: +def patch_npm_packages_download(packages: list[str]) -> Iterator[mock.Mock]: """Patcher of `requests.get` for Top Npm list. Replaces call with the output you would get from downloading the top Npm packages list. """ - json_response = {"packages": [{"name": name} for name in packages]} + json_response = {"packages": packages, "date": datetime.datetime.now().isoformat()} with mock.patch("twyn.trusted_packages.TopNpmReference._download") as mock_download: mock_download.return_value = json_response diff --git a/tests/main/test_main.py b/tests/main/test_main.py index 7beb15fe..e56eef80 100644 --- a/tests/main/test_main.py +++ b/tests/main/test_main.py @@ -286,32 +286,6 @@ def test_check_dependencies_ignores_package_in_allowlist( assert error == TyposquatCheckResultList(errors=[]) - @pytest.mark.parametrize( - "package_name", - [ - "my.package", - "my-package", - "my_package", - "My.Package", - ], - ) - @patch("twyn.trusted_packages.TopPyPiReference._get_packages_from_cache_if_enabled") - def test_normalize_package(self, mock_get_packages_from_cache: Mock, package_name: Mock) -> None: - mock_get_packages_from_cache.return_value = {"requests", "mypackage"} - error = check_dependencies( - config_file=None, - dependency_file=None, - dependencies={package_name}, - selector_method="first-letter", - package_ecosystem="pypi", - ) - - assert error == TyposquatCheckResultList( - errors=[ - TyposquatCheckResult(dependency="my-package", similars=["mypackage"]), - ] - ) - @patch("twyn.trusted_packages.TopPyPiReference.get_packages") def test_check_dependencies_does_not_error_on_same_package( self, mock_get_packages: Mock, uv_lock_file_with_typo: Path diff --git a/tests/trusted_packages/test_references.py b/tests/trusted_packages/test_references.py index eb972f18..115b460e 100644 --- a/tests/trusted_packages/test_references.py +++ b/tests/trusted_packages/test_references.py @@ -11,7 +11,7 @@ from twyn.trusted_packages.exceptions import ( EmptyPackagesListError, InvalidJSONError, - InvalidReferenceFormatError, + PackageNormalizingError, ) from twyn.trusted_packages.references.base import AbstractPackageReference @@ -34,7 +34,7 @@ def test_get_packages(self) -> None: @freeze_time("2025-8-19") def test_get_trusted_packages_uses_valid_cache(self, tmp_path: Path) -> None: """Test that valid cached data is loaded and used without fetching from PyPI.""" - packages = {"requests", "flask", "django", "fastapi"} + packages = ["requests", "flask", "django", "fastapi"] cache_handler = CacheHandler(str(tmp_path / "cache")) cache_entry = CacheEntry(saved_date="2025-08-18", packages=packages) @@ -44,7 +44,7 @@ def test_get_trusted_packages_uses_valid_cache(self, tmp_path: Path) -> None: retrieved_cache_entry = cache_handler.get_cache_entry("pypi") assert retrieved_cache_entry is not None assert retrieved_cache_entry.saved_date == "2025-08-18" - assert retrieved_cache_entry.packages == packages + assert retrieved_cache_entry.packages == set(packages) with patch_pypi_packages_download(packages) as m_pypi: result = TopPyPiReference("pypi", cache_handler=cache_handler).get_packages() @@ -110,7 +110,7 @@ def test_get_packages_downloads_when_cache_has_invalid_package_names(self, tmp_p @freeze_time("2025-8-21", tz_offset=0) def test_cache_is_saved_when_not_existing(self, tmp_path: Path) -> None: """Test that cache starts empty and gets filled after downloading packages.""" - cached_packages = {"numpy", "requests", "django"} + cached_packages = ["numpy", "requests", "django"] cache_handler = CacheHandler(str(tmp_path / "cache")) with patch_pypi_packages_download(cached_packages) as m_pypi: pypi_ref = TopPyPiReference(source="pypi", cache_handler=cache_handler) @@ -119,12 +119,12 @@ def test_cache_is_saved_when_not_existing(self, tmp_path: Path) -> None: # The packages were downloaded and match the expected result assert m_pypi.call_count == 1 - assert retrieved_packages == cached_packages + assert retrieved_packages == set(cached_packages) # The packages were saved to the cache file, with its associated metadata cache_content = cache_handler.get_cache_entry("pypi") - assert set(cache_content.packages) == cached_packages + assert set(cache_content.packages) == set(cached_packages) assert cache_content.saved_date == "2025-08-21" @patch("requests.get") @@ -140,6 +140,24 @@ def test__download_json_exception(self, mock_get: Mock) -> None: ): top_pypi._download() + def test_get_packages_no_packages_key(self) -> None: + top_pypi = TopPyPiReference(source="foo", cache_handler=CacheHandler()) + + with patch("twyn.trusted_packages.TopPyPiReference._download") as mock_download: + mock_download.return_value = {} + with pytest.raises(InvalidJSONError, match="`packages` key not in JSON."): + top_pypi.get_packages() + + def test_empty_packages_list_exception(self) -> None: + with ( + pytest.raises( + EmptyPackagesListError, + match="Downloaded packages list is empty", + ), + patch_pypi_packages_download([]), + ): + TopPyPiReference().get_packages() + class TestTopPyPiReference: def test_get_trusted_packages(self, tmp_path: Path) -> None: @@ -152,29 +170,60 @@ def test_get_trusted_packages(self, tmp_path: Path) -> None: assert packages == {"foo", "bar", "django", "requests", "sqlalchemy"} assert m_pypi.call_count == 1 - def test__parse_no_rows(self) -> None: - data = {"bananas": 5} - top_pypi = TopPyPiReference(source="foo", cache_handler=CacheHandler()) - - with pytest.raises(InvalidReferenceFormatError, match="Invalid JSON format."): - top_pypi._parse(data) + @pytest.mark.parametrize( + "package_name", + [ + "my.package", + "my-package", + "my_package", + "My.Package", + ], + ) + @patch("twyn.trusted_packages.TopPyPiReference._get_packages_from_cache_if_enabled") + def test_normalize_package_when_loaded_from_cache( + self, mock_get_packages_from_cache: Mock, package_name: Mock, tmp_path: Path + ) -> None: + mock_get_packages_from_cache.return_value = {package_name} + + with patch_pypi_packages_download([]) as m_pypi: + ref = TopPyPiReference(cache_handler=CacheHandler(str(tmp_path / "cache"))) + packages = ref.get_packages() - def test_empty_packages_list_exception(self) -> None: - with pytest.raises( - EmptyPackagesListError, - match="Downloaded packages list is empty", - ): - TopPyPiReference._parse({"rows": []}) + assert packages == {"my-package"} + assert m_pypi.call_count == 0 + assert mock_get_packages_from_cache.call_count == 1 + + @pytest.mark.parametrize( + "package_name", + [ + "my.package", + "my-package", + "my_package", + "My.Package", + ], + ) + @patch("twyn.trusted_packages.TopPyPiReference._get_packages_from_cache_if_enabled") + def test_normalize_package_when_downloaded( + self, mock_get_packages_from_cache: Mock, package_name: Mock, tmp_path: Path + ) -> None: + mock_get_packages_from_cache.return_value = {} + + with patch_pypi_packages_download([package_name]) as m_pypi: + ref = TopPyPiReference() + packages = ref.get_packages() - def test__parse_retrieves_package_names(self) -> None: - data = {"rows": [{"project": "boto3"}, {"project": "requests"}]} - top_pypi = TopPyPiReference(source="foo", cache_handler=CacheHandler()) + assert packages == {"my-package"} + assert m_pypi.call_count == 1 + assert mock_get_packages_from_cache.call_count == 1 - assert top_pypi._parse(data) == {"boto3", "requests"} + def test_normalize_package_invalid_name_raises(self): + ref = TopPyPiReference() + with pytest.raises(PackageNormalizingError): + ref.normalize_packages({"INVALID PACKAGE NAME!"}) class TestTopNpmReference: - def test_get_trusted_packages_v2(self, tmp_path: Path) -> None: + def test_get_trusted_packages(self, tmp_path: Path) -> None: test_packages = ["foo", "bar", "react", "express", "lodash"] with patch_npm_packages_download(test_packages) as m_npm: @@ -184,22 +233,7 @@ def test_get_trusted_packages_v2(self, tmp_path: Path) -> None: assert packages == {"foo", "bar", "react", "express", "lodash"} assert m_npm.call_count == 1 - def test__parse_no_rows(self) -> None: - data = {"bananas": 5} - npm_ref = TopNpmReference(source="foo", cache_handler=CacheHandler()) - - with pytest.raises(InvalidReferenceFormatError, match="Invalid JSON format."): - npm_ref._parse(data) - - def test_empty_packages_list_exception(self) -> None: - with pytest.raises( - EmptyPackagesListError, - match="Downloaded packages list is empty", - ): - TopNpmReference._parse({"packages": []}) - - def test__parse_retrieves_package_names(self) -> None: - data = {"packages": [{"name": "react"}, {"name": "express"}]} - npm_ref = TopNpmReference(source="foo", cache_handler=CacheHandler()) - - assert npm_ref._parse(data) == {"react", "express"} + def test_normalize_package_invalid_name_raises(self): + ref = TopNpmReference() + with pytest.raises(PackageNormalizingError): + ref.normalize_packages({"INVALID PACKAGE NAME!"})