diff --git a/scanpipe/pipes/fetch.py b/scanpipe/pipes/fetch.py index f1b249fec9..99568d73bb 100644 --- a/scanpipe/pipes/fetch.py +++ b/scanpipe/pipes/fetch.py @@ -38,9 +38,8 @@ from commoncode import command from commoncode.hash import multi_checksums from commoncode.text import python_safe_name -from fetchcode.pypi import Pypi as PyPIFetcher +from fetchcode import fetch as fetchcode_fetch from packageurl import PackageURL -from packageurl.contrib import purl2url from plugincode.location_provider import get_location from requests import auth as request_auth @@ -325,20 +324,28 @@ def fetch_git_repo(url, to=None): def fetch_package_url(url): # Ensure the provided Package URL is valid, or raise a ValueError. - purl = PackageURL.from_string(url) + PackageURL.from_string(url) - # Resolve a Download URL using purl2url. - if download_url := purl2url.get_download_url(url): - return fetch_http(download_url) + try: + result = fetchcode_fetch(url) + except Exception as e: + raise ValueError(f"Could not fetch package for {url}: {e}") - # PyPI is not supported by purl2url. - # It requires an API call to resolve download URLs. - if purl.type == "pypi": - if download_url := PyPIFetcher.get_download_url(url, preferred_type="sdist"): - return fetch_http(download_url) + if not result or not result.location: + raise ValueError(f"Could not resolve a download URL for {url}.") - raise ValueError(f"Could not resolve a download URL for {url}.") + path = Path(result.location) + checksums = multi_checksums(path, ("md5", "sha1")) + return Download( + uri=url, + directory=str(path.parent), + filename=path.name, + path=path, + size=path.stat().st_size, + sha1=checksums["sha1"], + md5=checksums["md5"], + ) SCHEME_TO_FETCHER_MAPPING = { "http": fetch_http, diff --git a/scanpipe/tests/pipes/test_fetch.py b/scanpipe/tests/pipes/test_fetch.py index 6157d2a026..3360a7e29c 100644 --- a/scanpipe/tests/pipes/test_fetch.py +++ b/scanpipe/tests/pipes/test_fetch.py @@ -89,9 +89,10 @@ def test_scanpipe_pipes_fetch_http(self, mock_get): mock_get.return_value = make_mock_response(url=url, headers=headers) downloaded_file = fetch.fetch_http(url) self.assertTrue(Path(downloaded_file.directory, "another_name.zip").exists()) - + + @mock.patch("scanpipe.pipes.fetch.fetchcode_fetch") @mock.patch("requests.sessions.Session.get") - def test_scanpipe_pipes_fetch_package_url(self, mock_get): + def test_scanpipe_pipes_fetch_package_url(self, mock_get, mock_fetchcode_fetch): package_url = "pkg:not_a_valid_purl" with self.assertRaises(ValueError) as cm: fetch.fetch_package_url(package_url) @@ -99,28 +100,70 @@ def test_scanpipe_pipes_fetch_package_url(self, mock_get): self.assertEqual(expected, str(cm.exception)) package_url = "pkg:generic/name@version" + mock_fetchcode_fetch.side_effect = ValueError("Could not resolve PURL to a valid URL.") with self.assertRaises(ValueError) as cm: - fetch.fetch_package_url(package_url) - expected = f"Could not resolve a download URL for {package_url}." + fetch.fetch_package_url(package_url) + expected = f"Could not fetch package for {package_url}: Could not resolve PURL to a valid URL." self.assertEqual(expected, str(cm.exception)) package_url = "pkg:npm/d3@5.8.0" - mock_get.return_value = make_mock_response(url="https://exa.com/filename.zip") - downloaded_file = fetch.fetch_package_url(package_url) - self.assertTrue(Path(downloaded_file.directory, "filename.zip").exists()) - - @mock.patch("fetchcode.pypi.fetch_json_response") - @mock.patch("requests.sessions.Session.get") - def test_scanpipe_pipes_fetch_pypi_package_url(self, mock_get, mock_fetch_json): + mock_fetchcode_fetch.side_effect = None + mock_result = mock.Mock() + mock_result.location = "/tmp/fakedir/filename.zip" + mock_fetchcode_fetch.return_value = mock_result + + with mock.patch("scanpipe.pipes.fetch.Path.stat") as mock_stat: + mock_stat.return_value.st_size = 1234 + with mock.patch("scanpipe.pipes.fetch.multi_checksums") as mock_checksums: + mock_checksums.return_value = {"sha1": "abc", "md5": "def"} + downloaded_file = fetch.fetch_package_url(package_url) + + self.assertEqual(package_url, downloaded_file.uri) + self.assertEqual("filename.zip", downloaded_file.filename) + + @mock.patch("scanpipe.pipes.fetch.fetchcode_fetch") + def test_scanpipe_pipes_fetch_pypi_package_url(self, mock_fetchcode_fetch): package_url = "pkg:pypi/django@5.2" - download_url = "https://files.pythonhosted.org/packages/Django-5.2.tar.gz" - mock_get.return_value = make_mock_response(url=download_url) - mock_fetch_json.return_value = {"urls": [{"url": download_url}]} + mock_result = mock.Mock() + mock_result.location = "/tmp/fakedir/Django-5.2.tar.gz" + mock_fetchcode_fetch.return_value = mock_result + + with mock.patch("scanpipe.pipes.fetch.Path.stat") as mock_stat: + mock_stat.return_value.st_size = 1234 + with mock.patch("scanpipe.pipes.fetch.multi_checksums") as mock_checksums: + mock_checksums.return_value = {"sha1": "abc", "md5": "def"} + downloaded_file = fetch.fetch_package_url(package_url) - downloaded_file = fetch.fetch_package_url(package_url) - self.assertEqual(download_url, mock_get.call_args[0][0]) - self.assertTrue(Path(downloaded_file.directory, "Django-5.2.tar.gz").exists()) + self.assertEqual(package_url, downloaded_file.uri) + self.assertEqual("Django-5.2.tar.gz", downloaded_file.filename) + mock_fetchcode_fetch.assert_called_once_with(package_url) + + @mock.patch("scanpipe.pipes.fetch.fetchcode_fetch") + def test_scanpipe_pipes_fetch_package_url_returns_none(self, mock_fetchcode_fetch): + mock_fetchcode_fetch.return_value = None + with self.assertRaises(ValueError) as cm: + fetch.fetch_package_url("pkg:pypi/django@5.2") + expected = "Could not resolve a download URL for pkg:pypi/django@5.2." + self.assertEqual(expected, str(cm.exception)) + + @mock.patch("scanpipe.pipes.fetch.fetchcode_fetch") + def test_scanpipe_pipes_fetch_package_url_no_location(self, mock_fetchcode_fetch): + mock_result = mock.Mock() + mock_result.location = None + mock_fetchcode_fetch.return_value = mock_result + with self.assertRaises(ValueError) as cm: + fetch.fetch_package_url("pkg:pypi/django@5.2") + expected = "Could not resolve a download URL for pkg:pypi/django@5.2." + self.assertEqual(expected, str(cm.exception)) + + @mock.patch("scanpipe.pipes.fetch.fetchcode_fetch") + def test_scanpipe_pipes_fetch_package_url_fetchcode_exception(self, mock_fetchcode_fetch): + mock_fetchcode_fetch.side_effect = Exception("network error") + with self.assertRaises(ValueError) as cm: + fetch.fetch_package_url("pkg:pypi/django@5.2") + expected = "Could not fetch package for pkg:pypi/django@5.2: network error" + self.assertEqual(expected, str(cm.exception)) @mock.patch("scanpipe.pipes.fetch.get_docker_image_platform") @mock.patch("scanpipe.pipes.fetch._get_skopeo_location")