From 5d9521021ca1c8b250b7a2fbeca15259ce5702d5 Mon Sep 17 00:00:00 2001 From: hemant-rgb Date: Wed, 24 Dec 2025 10:09:49 +0530 Subject: [PATCH 1/3] Add type hints and missing docstring for code quality improvements Signed-off-by: hemant-rgb --- scanpipe/pipes/__init__.py | 6 ++++-- scanpipe/pipes/vulnerablecode.py | 1 + scanpipe/views.py | 11 ++++++----- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/scanpipe/pipes/__init__.py b/scanpipe/pipes/__init__.py index 7fd885c36f..391ea0c2f8 100644 --- a/scanpipe/pipes/__init__.py +++ b/scanpipe/pipes/__init__.py @@ -26,8 +26,10 @@ import sys import time import uuid +from collections.abc import Callable from contextlib import suppress from datetime import datetime +from typing import Any from itertools import islice from pathlib import Path @@ -548,7 +550,7 @@ def get_resource_diff_ratio(resource_a, resource_b): ) -def poll_until_success(check, sleep=10, **kwargs): +def poll_until_success(check: Callable[..., Any], sleep: int = 10, **kwargs: Any) -> bool: """ Given a function `check`, which returns the status of a run, return True when the run instance has completed successfully. @@ -577,7 +579,7 @@ def poll_until_success(check, sleep=10, **kwargs): time.sleep(sleep) -def run_command_safely(command_args): +def run_command_safely(command_args: list[str]) -> str: """ Execute the external commands following security best practices. diff --git a/scanpipe/pipes/vulnerablecode.py b/scanpipe/pipes/vulnerablecode.py index 6c6073b5d0..e67fa7ce6d 100644 --- a/scanpipe/pipes/vulnerablecode.py +++ b/scanpipe/pipes/vulnerablecode.py @@ -118,6 +118,7 @@ def request_post( data, timeout=None, ): + """Wrap the HTTP POST request calls on the API.""" try: response = session.post(url, json=data, timeout=timeout) response.raise_for_status() diff --git a/scanpipe/views.py b/scanpipe/views.py index 9913d4947f..1fe9859f43 100644 --- a/scanpipe/views.py +++ b/scanpipe/views.py @@ -26,6 +26,7 @@ import operator from collections import Counter from contextlib import suppress +from typing import Any from django.apps import apps from django.conf import settings @@ -192,7 +193,7 @@ ] -def purldb_is_configured(*args): +def purldb_is_configured(*args: Any) -> bool: return purldb.is_configured() @@ -203,21 +204,21 @@ def get_queryset(self): return super().get_queryset().prefetch_related(*self.prefetch_related) -def render_as_yaml(value): +def render_as_yaml(value: Any) -> str | None: if value: return saneyaml.dump(value, indent=2) -def render_size(size_in_bytes): +def render_size(size_in_bytes: int | None) -> str | None: if size_in_bytes: return f"{size_in_bytes} ({filesizeformat(size_in_bytes)})" -def fields_have_no_values(fields_data): +def fields_have_no_values(fields_data: dict[str, Any]) -> bool: return not any([field_data.get("value") for field_data in fields_data.values()]) -def do_not_disable(*args, **kwargs): +def do_not_disable(*args: Any, **kwargs: Any) -> bool: return False From e0e2ef9f0fb61d5e2077a514bb2731f6cc84220a Mon Sep 17 00:00:00 2001 From: hemant-rgb Date: Wed, 24 Dec 2025 10:47:41 +0530 Subject: [PATCH 2/3] Improve docstring in vulnerablecode.py for clarity Signed-off-by: hemant-rgb --- scanpipe/pipes/vulnerablecode.py | 96 +++++++++++++++++++------------- 1 file changed, 56 insertions(+), 40 deletions(-) diff --git a/scanpipe/pipes/vulnerablecode.py b/scanpipe/pipes/vulnerablecode.py index e67fa7ce6d..1cc4978df6 100644 --- a/scanpipe/pipes/vulnerablecode.py +++ b/scanpipe/pipes/vulnerablecode.py @@ -21,6 +21,9 @@ # Visit https://github.com/aboutcode-org/scancode.io for support and download. import logging +from collections.abc import Callable +from collections.abc import Iterable +from typing import Any from django.conf import settings @@ -50,14 +53,14 @@ session.headers.update({"Authorization": f"Token {VULNERABLECODE_API_KEY}"}) -def is_configured(): +def is_configured() -> bool: """Return True if the required VulnerableCode settings have been set.""" if VULNERABLECODE_API_URL: return True return False -def is_available(): +def is_available() -> bool: """Return True if the configured VulnerableCode server is available.""" if not is_configured(): return False @@ -72,7 +75,7 @@ def is_available(): return response.status_code == requests.codes.ok -def chunked(iterable, chunk_size): +def chunked(iterable: list[Any], chunk_size: int) -> Iterable[list[Any]]: """ Break an `iterable` into lists of `chunk_size` length. @@ -86,19 +89,19 @@ def chunked(iterable, chunk_size): yield iterable[index:end] -def get_purls(packages): +def get_purls(packages: list[Any]) -> list[str]: """Return the PURLs for the given list of `packages`.""" return [package_url for package in packages if (package_url := package.package_url)] def request_get( - url, - payload=None, - timeout=None, -): + url: str | None, + payload: dict[str, Any] | None = None, + timeout: int | None = None, +) -> dict[str, Any] | None: """Wrap the HTTP request calls on the API.""" if not url: - return + return None params = {"format": "json"} if payload: @@ -111,13 +114,14 @@ def request_get( return response.json() except (requests.RequestException, ValueError, TypeError) as exception: logger.debug(f"{label} [Exception] {exception}") + return None def request_post( - url, - data, - timeout=None, -): + url: str, + data: dict[str, Any], + timeout: int | None = None, +) -> dict[str, Any] | None: """Wrap the HTTP POST request calls on the API.""" try: response = session.post(url, json=data, timeout=timeout) @@ -125,14 +129,15 @@ def request_post( return response.json() except (requests.RequestException, ValueError, TypeError) as exception: logger.debug(f"{label} [Exception] {exception}") + return None def _get_vulnerabilities( - url, - field_name, - field_value, - timeout=None, -): + url: str, + field_name: str, + field_value: str, + timeout: int | None = None, +) -> list[dict[str, Any]] | None: """Get the list of vulnerabilities.""" payload = {field_name: field_value} @@ -140,13 +145,14 @@ def _get_vulnerabilities( if response and response.get("count"): results = response["results"] return results + return None def get_vulnerabilities_by_purl( - purl, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): + purl: str, + timeout: int | None = None, + api_url: str | None = VULNERABLECODE_API_URL, +) -> list[dict[str, Any]] | None: """Get the list of vulnerabilities providing a package `purl`.""" return _get_vulnerabilities( url=f"{api_url}packages/", @@ -157,10 +163,10 @@ def get_vulnerabilities_by_purl( def get_vulnerabilities_by_cpe( - cpe, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): + cpe: str, + timeout: int | None = None, + api_url: str | None = VULNERABLECODE_API_URL, +) -> list[dict[str, Any]] | None: """Get the list of vulnerabilities providing a package or component `cpe`.""" return _get_vulnerabilities( url=f"{api_url}cpes/", @@ -171,10 +177,10 @@ def get_vulnerabilities_by_cpe( def bulk_search_by_purl( - purls, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): + purls: list[str], + timeout: int | None = None, + api_url: str | None = VULNERABLECODE_API_URL, +) -> list[dict[str, Any]] | None: """Bulk search of vulnerabilities using the provided list of `purls`.""" url = f"{api_url}packages/bulk_search" @@ -184,14 +190,18 @@ def bulk_search_by_purl( } logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}") - return request_post(url, data, timeout) + response = request_post(url, data, timeout) + # API returns a list of vulnerability data dicts, not a dict + if isinstance(response, list): + return response + return None def bulk_search_by_cpes( - cpes, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): + cpes: list[str], + timeout: int | None = None, + api_url: str | None = VULNERABLECODE_API_URL, +) -> dict[str, Any] | None: """Bulk search of vulnerabilities using the provided list of `cpes`.""" url = f"{api_url}cpes/bulk_search" @@ -203,7 +213,9 @@ def bulk_search_by_cpes( return request_post(url, data, timeout) -def filter_vulnerabilities(vulnerabilities, ignore_set): +def filter_vulnerabilities( + vulnerabilities: list[dict[str, Any]], ignore_set: set[str] +) -> list[dict[str, Any]]: """Filter out vulnerabilities based on a list of ignored IDs and aliases.""" return [ vulnerability @@ -214,8 +226,11 @@ def filter_vulnerabilities(vulnerabilities, ignore_set): def fetch_vulnerabilities( - packages, chunk_size=1000, logger=logger.info, ignore_set=None -): + packages: list[Any], + chunk_size: int = 1000, + logger: Callable[[str], None] = logger.info, + ignore_set: set[str] | None = None, +) -> None: """ Fetch and store vulnerabilities for each provided ``packages``. The PURLs are used for the lookups in batch of ``chunk_size`` per request. @@ -224,8 +239,9 @@ def fetch_vulnerabilities( for purls_batch in chunked(get_purls(packages), chunk_size): response_data = bulk_search_by_purl(purls_batch) - for vulnerability_data in response_data: - vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data + if response_data: + for vulnerability_data in response_data: + vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data unsaved_objects = [] for package in packages: From e2cc07470942c7806caab7c5fa8100d1f7b394fc Mon Sep 17 00:00:00 2001 From: hemant-rgb Date: Wed, 24 Dec 2025 11:09:04 +0530 Subject: [PATCH 3/3] more improvements related to typos , edge cases Signed-off-by: hemant-rgb --- scanpipe/pipes/__init__.py | 18 ++++++++++-------- scanpipe/pipes/output.py | 7 ++++++- scanpipe/pipes/scancode.py | 2 ++ scanpipe/views.py | 2 +- 4 files changed, 19 insertions(+), 10 deletions(-) diff --git a/scanpipe/pipes/__init__.py b/scanpipe/pipes/__init__.py index 391ea0c2f8..d565169ab4 100644 --- a/scanpipe/pipes/__init__.py +++ b/scanpipe/pipes/__init__.py @@ -34,6 +34,7 @@ from pathlib import Path from django.db.models import Count +from django.db.models import QuerySet from scanpipe.models import AbstractTaskFieldsModel from scanpipe.models import CodebaseRelation @@ -484,23 +485,23 @@ def make_relation(from_resource, to_resource, map_type, **extra_fields): ) -def normalize_path(path): +def normalize_path(path: str) -> str: """Return a normalized path from a `path` string.""" return "/" + path.strip("/") -def strip_root(location): +def strip_root(location: str | Path) -> str: """Return the provided `location` without the root directory.""" return "/".join(str(location).strip("/").split("/")[1:]) -def filename_now(sep="-"): +def filename_now(sep: str = "-") -> str: """Return the current date and time in iso format suitable for filename.""" now = datetime.now().isoformat(sep=sep, timespec="seconds") return now.replace(":", sep) -def count_group_by(queryset, field_name): +def count_group_by(queryset: QuerySet[Any], field_name: str) -> dict[Any, int]: """ Return a summary of all existing values for the provided `field_name` on the `queryset`, including the count of each entry, as a dictionary. @@ -514,12 +515,12 @@ def count_group_by(queryset, field_name): return {entry.get(field_name): entry.get("count") for entry in counts} -def get_bin_executable(filename): +def get_bin_executable(filename: str) -> str: """Return the location of the `filename` executable binary.""" return str(Path(sys.executable).parent / filename) -def get_text_str_diff_ratio(str_a, str_b): +def get_text_str_diff_ratio(str_a: str, str_b: str) -> float | None: """ Return a similarity ratio as a float between 0 and 1 by comparing the text content of the ``str_a`` and ``str_b``. @@ -527,7 +528,7 @@ def get_text_str_diff_ratio(str_a, str_b): Return None if any of the two resources str is empty. """ if not (str_a and str_b): - return + return None if not isinstance(str_a, str) or not isinstance(str_b, str): raise ValueError("Values must be str") @@ -536,7 +537,7 @@ def get_text_str_diff_ratio(str_a, str_b): return matcher.quick_ratio() -def get_resource_diff_ratio(resource_a, resource_b): +def get_resource_diff_ratio(resource_a: CodebaseResource, resource_b: CodebaseResource) -> float | None: """ Return a similarity ratio as a float between 0 and 1 by comparing the text content of the CodebaseResource ``resource_a`` and ``resource_b``. @@ -548,6 +549,7 @@ def get_resource_diff_ratio(resource_a, resource_b): str_a=resource_a.file_content, str_b=resource_b.file_content, ) + return None def poll_until_success(check: Callable[..., Any], sleep: int = 10, **kwargs: Any) -> bool: diff --git a/scanpipe/pipes/output.py b/scanpipe/pipes/output.py index 03fc4ca51c..de7e4cce7b 100644 --- a/scanpipe/pipes/output.py +++ b/scanpipe/pipes/output.py @@ -494,7 +494,12 @@ def _adapt_value_for_xlsx(fieldname, value, maximum_length=32767, _adapt=True): # we only get this key in each dict of a list for some fields mapping_key = mappings_key_by_fieldname.get(fieldname) if mapping_key: - value = [mapping[mapping_key] for mapping in value] + # Use .get() to safely access keys and filter out None values + value = [ + mapping.get(mapping_key) + for mapping in value + if mapping.get(mapping_key) is not None + ] # convert these to text lines, remove duplicates if isinstance(value, list | tuple): diff --git a/scanpipe/pipes/scancode.py b/scanpipe/pipes/scancode.py index 609e86b69c..00f4dc3ae2 100644 --- a/scanpipe/pipes/scancode.py +++ b/scanpipe/pipes/scancode.py @@ -1162,6 +1162,8 @@ def make_results_summary(project, scan_results_location): scan_data = json.load(f) summary = scan_data.get("summary") + if not summary: + raise ValueError(f"Missing 'summary' in scan results at {scan_results_location}") # Inject the generated `license_matches` in the summary from the project # codebase resources. diff --git a/scanpipe/views.py b/scanpipe/views.py index 1fe9859f43..32d1876475 100644 --- a/scanpipe/views.py +++ b/scanpipe/views.py @@ -235,7 +235,7 @@ def do_not_disable(*args: Any, **kwargs: Any) -> bool: ] -def is_displayable_image_type(resource): +def is_displayable_image_type(resource: CodebaseResource) -> bool: """Return True if the ``resource`` file is supported by the HTML tag.""" return resource.mime_type and resource.mime_type in DISPLAYABLE_IMAGE_MIME_TYPE