Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions scanpipe/pipes/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
from commoncode import command
from commoncode.hash import multi_checksums
from commoncode.text import python_safe_name
from fetchcode.pypi import Pypi as PyPIFetcher
from fetchcode import fetch as fetchcode_fetch
from packageurl import PackageURL
from packageurl.contrib import purl2url
from plugincode.location_provider import get_location
from requests import auth as request_auth

Expand Down Expand Up @@ -325,20 +324,28 @@ def fetch_git_repo(url, to=None):

def fetch_package_url(url):
# Ensure the provided Package URL is valid, or raise a ValueError.
purl = PackageURL.from_string(url)
PackageURL.from_string(url)

# Resolve a Download URL using purl2url.
if download_url := purl2url.get_download_url(url):
return fetch_http(download_url)
try:
result = fetchcode_fetch(url)
except Exception as e:
raise ValueError(f"Could not fetch package for {url}: {e}")

# PyPI is not supported by purl2url.
# It requires an API call to resolve download URLs.
if purl.type == "pypi":
if download_url := PyPIFetcher.get_download_url(url, preferred_type="sdist"):
return fetch_http(download_url)
if not result or not result.location:
raise ValueError(f"Could not resolve a download URL for {url}.")

raise ValueError(f"Could not resolve a download URL for {url}.")
path = Path(result.location)
checksums = multi_checksums(path, ("md5", "sha1"))

return Download(
uri=url,
directory=str(path.parent),
filename=path.name,
path=path,
size=path.stat().st_size,
sha1=checksums["sha1"],
md5=checksums["md5"],
)

SCHEME_TO_FETCHER_MAPPING = {
"http": fetch_http,
Expand Down
77 changes: 60 additions & 17 deletions scanpipe/tests/pipes/test_fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,38 +89,81 @@ def test_scanpipe_pipes_fetch_http(self, mock_get):
mock_get.return_value = make_mock_response(url=url, headers=headers)
downloaded_file = fetch.fetch_http(url)
self.assertTrue(Path(downloaded_file.directory, "another_name.zip").exists())


@mock.patch("scanpipe.pipes.fetch.fetchcode_fetch")
@mock.patch("requests.sessions.Session.get")
def test_scanpipe_pipes_fetch_package_url(self, mock_get):
def test_scanpipe_pipes_fetch_package_url(self, mock_get, mock_fetchcode_fetch):
package_url = "pkg:not_a_valid_purl"
with self.assertRaises(ValueError) as cm:
fetch.fetch_package_url(package_url)
expected = f"purl is missing the required type component: '{package_url}'."
self.assertEqual(expected, str(cm.exception))

package_url = "pkg:generic/name@version"
mock_fetchcode_fetch.side_effect = ValueError("Could not resolve PURL to a valid URL.")
with self.assertRaises(ValueError) as cm:
fetch.fetch_package_url(package_url)
expected = f"Could not resolve a download URL for {package_url}."
fetch.fetch_package_url(package_url)
expected = f"Could not fetch package for {package_url}: Could not resolve PURL to a valid URL."
self.assertEqual(expected, str(cm.exception))

package_url = "pkg:npm/d3@5.8.0"
mock_get.return_value = make_mock_response(url="https://exa.com/filename.zip")
downloaded_file = fetch.fetch_package_url(package_url)
self.assertTrue(Path(downloaded_file.directory, "filename.zip").exists())

@mock.patch("fetchcode.pypi.fetch_json_response")
@mock.patch("requests.sessions.Session.get")
def test_scanpipe_pipes_fetch_pypi_package_url(self, mock_get, mock_fetch_json):
mock_fetchcode_fetch.side_effect = None
mock_result = mock.Mock()
mock_result.location = "/tmp/fakedir/filename.zip"
mock_fetchcode_fetch.return_value = mock_result

with mock.patch("scanpipe.pipes.fetch.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 1234
with mock.patch("scanpipe.pipes.fetch.multi_checksums") as mock_checksums:
mock_checksums.return_value = {"sha1": "abc", "md5": "def"}
downloaded_file = fetch.fetch_package_url(package_url)

self.assertEqual(package_url, downloaded_file.uri)
self.assertEqual("filename.zip", downloaded_file.filename)

@mock.patch("scanpipe.pipes.fetch.fetchcode_fetch")
def test_scanpipe_pipes_fetch_pypi_package_url(self, mock_fetchcode_fetch):
package_url = "pkg:pypi/django@5.2"
download_url = "https://files.pythonhosted.org/packages/Django-5.2.tar.gz"

mock_get.return_value = make_mock_response(url=download_url)
mock_fetch_json.return_value = {"urls": [{"url": download_url}]}
mock_result = mock.Mock()
mock_result.location = "/tmp/fakedir/Django-5.2.tar.gz"
mock_fetchcode_fetch.return_value = mock_result

with mock.patch("scanpipe.pipes.fetch.Path.stat") as mock_stat:
mock_stat.return_value.st_size = 1234
with mock.patch("scanpipe.pipes.fetch.multi_checksums") as mock_checksums:
mock_checksums.return_value = {"sha1": "abc", "md5": "def"}
downloaded_file = fetch.fetch_package_url(package_url)

downloaded_file = fetch.fetch_package_url(package_url)
self.assertEqual(download_url, mock_get.call_args[0][0])
self.assertTrue(Path(downloaded_file.directory, "Django-5.2.tar.gz").exists())
self.assertEqual(package_url, downloaded_file.uri)
self.assertEqual("Django-5.2.tar.gz", downloaded_file.filename)
mock_fetchcode_fetch.assert_called_once_with(package_url)

@mock.patch("scanpipe.pipes.fetch.fetchcode_fetch")
def test_scanpipe_pipes_fetch_package_url_returns_none(self, mock_fetchcode_fetch):
mock_fetchcode_fetch.return_value = None
with self.assertRaises(ValueError) as cm:
fetch.fetch_package_url("pkg:pypi/django@5.2")
expected = "Could not resolve a download URL for pkg:pypi/django@5.2."
self.assertEqual(expected, str(cm.exception))

@mock.patch("scanpipe.pipes.fetch.fetchcode_fetch")
def test_scanpipe_pipes_fetch_package_url_no_location(self, mock_fetchcode_fetch):
mock_result = mock.Mock()
mock_result.location = None
mock_fetchcode_fetch.return_value = mock_result
with self.assertRaises(ValueError) as cm:
fetch.fetch_package_url("pkg:pypi/django@5.2")
expected = "Could not resolve a download URL for pkg:pypi/django@5.2."
self.assertEqual(expected, str(cm.exception))

@mock.patch("scanpipe.pipes.fetch.fetchcode_fetch")
def test_scanpipe_pipes_fetch_package_url_fetchcode_exception(self, mock_fetchcode_fetch):
mock_fetchcode_fetch.side_effect = Exception("network error")
with self.assertRaises(ValueError) as cm:
fetch.fetch_package_url("pkg:pypi/django@5.2")
expected = "Could not fetch package for pkg:pypi/django@5.2: network error"
self.assertEqual(expected, str(cm.exception))

@mock.patch("scanpipe.pipes.fetch.get_docker_image_platform")
@mock.patch("scanpipe.pipes.fetch._get_skopeo_location")
Expand Down
Loading