Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions scanpipe/pipes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
import sys
import time
import uuid
from collections.abc import Callable
from contextlib import suppress
from datetime import datetime
from typing import Any
from itertools import islice
from pathlib import Path

from django.db.models import Count
from django.db.models import QuerySet

from scanpipe.models import AbstractTaskFieldsModel
from scanpipe.models import CodebaseRelation
Expand Down Expand Up @@ -482,23 +485,23 @@ def make_relation(from_resource, to_resource, map_type, **extra_fields):
)


def normalize_path(path):
def normalize_path(path: str) -> str:
"""Return a normalized path from a `path` string."""
return "/" + path.strip("/")


def strip_root(location):
def strip_root(location: str | Path) -> str:
"""Return the provided `location` without the root directory."""
return "/".join(str(location).strip("/").split("/")[1:])


def filename_now(sep="-"):
def filename_now(sep: str = "-") -> str:
"""Return the current date and time in iso format suitable for filename."""
now = datetime.now().isoformat(sep=sep, timespec="seconds")
return now.replace(":", sep)


def count_group_by(queryset, field_name):
def count_group_by(queryset: QuerySet[Any], field_name: str) -> dict[Any, int]:
"""
Return a summary of all existing values for the provided `field_name` on the
`queryset`, including the count of each entry, as a dictionary.
Expand All @@ -512,20 +515,20 @@ def count_group_by(queryset, field_name):
return {entry.get(field_name): entry.get("count") for entry in counts}


def get_bin_executable(filename):
def get_bin_executable(filename: str) -> str:
"""Return the location of the `filename` executable binary."""
return str(Path(sys.executable).parent / filename)


def get_text_str_diff_ratio(str_a, str_b):
def get_text_str_diff_ratio(str_a: str, str_b: str) -> float | None:
"""
Return a similarity ratio as a float between 0 and 1 by comparing the
text content of the ``str_a`` and ``str_b``.

Return None if any of the two resources str is empty.
"""
if not (str_a and str_b):
return
return None

if not isinstance(str_a, str) or not isinstance(str_b, str):
raise ValueError("Values must be str")
Expand All @@ -534,7 +537,7 @@ def get_text_str_diff_ratio(str_a, str_b):
return matcher.quick_ratio()


def get_resource_diff_ratio(resource_a, resource_b):
def get_resource_diff_ratio(resource_a: CodebaseResource, resource_b: CodebaseResource) -> float | None:
"""
Return a similarity ratio as a float between 0 and 1 by comparing the
text content of the CodebaseResource ``resource_a`` and ``resource_b``.
Expand All @@ -546,9 +549,10 @@ def get_resource_diff_ratio(resource_a, resource_b):
str_a=resource_a.file_content,
str_b=resource_b.file_content,
)
return None


def poll_until_success(check, sleep=10, **kwargs):
def poll_until_success(check: Callable[..., Any], sleep: int = 10, **kwargs: Any) -> bool:
"""
Given a function `check`, which returns the status of a run, return True
when the run instance has completed successfully.
Expand Down Expand Up @@ -577,7 +581,7 @@ def poll_until_success(check, sleep=10, **kwargs):
time.sleep(sleep)


def run_command_safely(command_args):
def run_command_safely(command_args: list[str]) -> str:
"""
Execute the external commands following security best practices.

Expand Down
7 changes: 6 additions & 1 deletion scanpipe/pipes/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,12 @@ def _adapt_value_for_xlsx(fieldname, value, maximum_length=32767, _adapt=True):
# we only get this key in each dict of a list for some fields
mapping_key = mappings_key_by_fieldname.get(fieldname)
if mapping_key:
value = [mapping[mapping_key] for mapping in value]
# Use .get() to safely access keys and filter out None values
value = [
mapping.get(mapping_key)
for mapping in value
if mapping.get(mapping_key) is not None
]

# convert these to text lines, remove duplicates
if isinstance(value, list | tuple):
Expand Down
2 changes: 2 additions & 0 deletions scanpipe/pipes/scancode.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,8 @@ def make_results_summary(project, scan_results_location):
scan_data = json.load(f)

summary = scan_data.get("summary")
if not summary:
raise ValueError(f"Missing 'summary' in scan results at {scan_results_location}")

# Inject the generated `license_matches` in the summary from the project
# codebase resources.
Expand Down
97 changes: 57 additions & 40 deletions scanpipe/pipes/vulnerablecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
# Visit https://github.com/aboutcode-org/scancode.io for support and download.

import logging
from collections.abc import Callable
from collections.abc import Iterable
from typing import Any

from django.conf import settings

Expand Down Expand Up @@ -50,14 +53,14 @@
session.headers.update({"Authorization": f"Token {VULNERABLECODE_API_KEY}"})


def is_configured():
def is_configured() -> bool:
"""Return True if the required VulnerableCode settings have been set."""
if VULNERABLECODE_API_URL:
return True
return False


def is_available():
def is_available() -> bool:
"""Return True if the configured VulnerableCode server is available."""
if not is_configured():
return False
Expand All @@ -72,7 +75,7 @@ def is_available():
return response.status_code == requests.codes.ok


def chunked(iterable, chunk_size):
def chunked(iterable: list[Any], chunk_size: int) -> Iterable[list[Any]]:
"""
Break an `iterable` into lists of `chunk_size` length.

Expand All @@ -86,19 +89,19 @@ def chunked(iterable, chunk_size):
yield iterable[index:end]


def get_purls(packages):
def get_purls(packages: list[Any]) -> list[str]:
"""Return the PURLs for the given list of `packages`."""
return [package_url for package in packages if (package_url := package.package_url)]


def request_get(
url,
payload=None,
timeout=None,
):
url: str | None,
payload: dict[str, Any] | None = None,
timeout: int | None = None,
) -> dict[str, Any] | None:
"""Wrap the HTTP request calls on the API."""
if not url:
return
return None

params = {"format": "json"}
if payload:
Expand All @@ -111,41 +114,45 @@ def request_get(
return response.json()
except (requests.RequestException, ValueError, TypeError) as exception:
logger.debug(f"{label} [Exception] {exception}")
return None


def request_post(
url,
data,
timeout=None,
):
url: str,
data: dict[str, Any],
timeout: int | None = None,
) -> dict[str, Any] | None:
"""Wrap the HTTP POST request calls on the API."""
try:
response = session.post(url, json=data, timeout=timeout)
response.raise_for_status()
return response.json()
except (requests.RequestException, ValueError, TypeError) as exception:
logger.debug(f"{label} [Exception] {exception}")
return None


def _get_vulnerabilities(
url,
field_name,
field_value,
timeout=None,
):
url: str,
field_name: str,
field_value: str,
timeout: int | None = None,
) -> list[dict[str, Any]] | None:
"""Get the list of vulnerabilities."""
payload = {field_name: field_value}

response = request_get(url=url, payload=payload, timeout=timeout)
if response and response.get("count"):
results = response["results"]
return results
return None


def get_vulnerabilities_by_purl(
purl,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
purl: str,
timeout: int | None = None,
api_url: str | None = VULNERABLECODE_API_URL,
) -> list[dict[str, Any]] | None:
"""Get the list of vulnerabilities providing a package `purl`."""
return _get_vulnerabilities(
url=f"{api_url}packages/",
Expand All @@ -156,10 +163,10 @@ def get_vulnerabilities_by_purl(


def get_vulnerabilities_by_cpe(
cpe,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
cpe: str,
timeout: int | None = None,
api_url: str | None = VULNERABLECODE_API_URL,
) -> list[dict[str, Any]] | None:
"""Get the list of vulnerabilities providing a package or component `cpe`."""
return _get_vulnerabilities(
url=f"{api_url}cpes/",
Expand All @@ -170,10 +177,10 @@ def get_vulnerabilities_by_cpe(


def bulk_search_by_purl(
purls,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
purls: list[str],
timeout: int | None = None,
api_url: str | None = VULNERABLECODE_API_URL,
) -> list[dict[str, Any]] | None:
"""Bulk search of vulnerabilities using the provided list of `purls`."""
url = f"{api_url}packages/bulk_search"

Expand All @@ -183,14 +190,18 @@ def bulk_search_by_purl(
}

logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}")
return request_post(url, data, timeout)
response = request_post(url, data, timeout)
# API returns a list of vulnerability data dicts, not a dict
if isinstance(response, list):
return response
return None


def bulk_search_by_cpes(
cpes,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
cpes: list[str],
timeout: int | None = None,
api_url: str | None = VULNERABLECODE_API_URL,
) -> dict[str, Any] | None:
"""Bulk search of vulnerabilities using the provided list of `cpes`."""
url = f"{api_url}cpes/bulk_search"

Expand All @@ -202,7 +213,9 @@ def bulk_search_by_cpes(
return request_post(url, data, timeout)


def filter_vulnerabilities(vulnerabilities, ignore_set):
def filter_vulnerabilities(
vulnerabilities: list[dict[str, Any]], ignore_set: set[str]
) -> list[dict[str, Any]]:
"""Filter out vulnerabilities based on a list of ignored IDs and aliases."""
return [
vulnerability
Expand All @@ -213,8 +226,11 @@ def filter_vulnerabilities(vulnerabilities, ignore_set):


def fetch_vulnerabilities(
packages, chunk_size=1000, logger=logger.info, ignore_set=None
):
packages: list[Any],
chunk_size: int = 1000,
logger: Callable[[str], None] = logger.info,
ignore_set: set[str] | None = None,
) -> None:
"""
Fetch and store vulnerabilities for each provided ``packages``.
The PURLs are used for the lookups in batch of ``chunk_size`` per request.
Expand All @@ -223,8 +239,9 @@ def fetch_vulnerabilities(

for purls_batch in chunked(get_purls(packages), chunk_size):
response_data = bulk_search_by_purl(purls_batch)
for vulnerability_data in response_data:
vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data
if response_data:
for vulnerability_data in response_data:
vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data

unsaved_objects = []
for package in packages:
Expand Down
13 changes: 7 additions & 6 deletions scanpipe/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import operator
from collections import Counter
from contextlib import suppress
from typing import Any

from django.apps import apps
from django.conf import settings
Expand Down Expand Up @@ -192,7 +193,7 @@
]


def purldb_is_configured(*args):
def purldb_is_configured(*args: Any) -> bool:
return purldb.is_configured()


Expand All @@ -203,21 +204,21 @@ def get_queryset(self):
return super().get_queryset().prefetch_related(*self.prefetch_related)


def render_as_yaml(value):
def render_as_yaml(value: Any) -> str | None:
if value:
return saneyaml.dump(value, indent=2)


def render_size(size_in_bytes):
def render_size(size_in_bytes: int | None) -> str | None:
if size_in_bytes:
return f"{size_in_bytes} ({filesizeformat(size_in_bytes)})"


def fields_have_no_values(fields_data):
def fields_have_no_values(fields_data: dict[str, Any]) -> bool:
return not any([field_data.get("value") for field_data in fields_data.values()])


def do_not_disable(*args, **kwargs):
def do_not_disable(*args: Any, **kwargs: Any) -> bool:
return False


Expand All @@ -234,7 +235,7 @@ def do_not_disable(*args, **kwargs):
]


def is_displayable_image_type(resource):
def is_displayable_image_type(resource: CodebaseResource) -> bool:
"""Return True if the ``resource`` file is supported by the HTML <img> tag."""
return resource.mime_type and resource.mime_type in DISPLAYABLE_IMAGE_MIME_TYPE

Expand Down
Loading