From a356e526356fbff60ccea868ce73b9746d3757e7 Mon Sep 17 00:00:00 2001 From: NucleonGodX Date: Mon, 9 Jun 2025 19:18:22 +0530 Subject: [PATCH] initial push to add clarity-based compliance support Signed-off-by: NucleonGodX --- scanpipe/apps.py | 6 ++--- scanpipe/models.py | 36 +++++++++++++++++++++------ scanpipe/pipes/compliance.py | 17 +++++++++++++ scanpipe/pipes/scancode.py | 7 ++++++ scanpipe/policies.py | 48 ++++++++++++++++++++++++++++++++++++ 5 files changed, 103 insertions(+), 11 deletions(-) diff --git a/scanpipe/apps.py b/scanpipe/apps.py index b679fe0d9d..e84ccbc618 100644 --- a/scanpipe/apps.py +++ b/scanpipe/apps.py @@ -39,6 +39,7 @@ from scanpipe.policies import load_policies_file from scanpipe.policies import make_license_policy_index +from scanpipe.policies import make_clarity_policy_index try: from importlib import metadata as importlib_metadata @@ -218,7 +219,7 @@ def get_scancode_licenses(self): def set_policies(self): """ - Compute and sets the `license_policies` on the app instance. + Compute and sets the `license_policies` and `clarity_policies` on the app instance. If the policies file is available but not formatted properly or doesn't include the proper content, we want to raise an exception while the app @@ -233,8 +234,7 @@ def set_policies(self): policies = load_policies_file(policies_file) logger.debug(style.SUCCESS(f"Loaded policies from {policies_file}")) self.license_policies_index = make_license_policy_index(policies) - else: - logger.debug(style.WARNING("Policies file not found.")) + self.clarity_policies_index = make_clarity_policy_index(policies) def sync_runs_and_jobs(self): """Synchronize ``QUEUED`` and ``RUNNING`` Run with their related Jobs.""" diff --git a/scanpipe/models.py b/scanpipe/models.py index 93e31448f0..4b990dbc9a 100644 --- a/scanpipe/models.py +++ b/scanpipe/models.py @@ -1503,30 +1503,50 @@ def get_policy_index(self): The policies are loaded from the following locations in that order: 1. the project local settings 2. the "policies.yml" file in the project input/ directory - 3. the global app settings license policies + 3. the global app settings license and clarity policies """ if policies_from_settings := self.get_env("policies"): policies_dict = policies_from_settings if isinstance(policies_from_settings, str): policies_dict = policies.load_policies_yaml(policies_from_settings) - return policies.make_license_policy_index(policies_dict) + return { + 'license_policies': policies.make_license_policy_index(policies_dict), + 'clarity_policies': policies.make_clarity_policy_index(policies_dict) + } elif policies_file := self.get_input_policies_file(): policies_dict = policies.load_policies_file(policies_file) - return policies.make_license_policy_index(policies_dict) + return { + 'license_policies': policies.make_license_policy_index(policies_dict), + 'clarity_policies': policies.make_clarity_policy_index(policies_dict) + } else: - return scanpipe_app.license_policies_index + return { + 'license_policies': scanpipe_app.license_policies_index, + 'clarity_policies': getattr(scanpipe_app, 'clarity_policies_index', []) + } @cached_property def policy_index(self): - """Return the cached policy index for this project instance.""" - return self.get_policy_index() + """Return the cached license policy index for this project instance""" + full_policies = self.get_policy_index() + return full_policies.get('license_policies', {}) + + @cached_property + def clarity_policy_index(self): + """Return clarity policies""" + full_policies = self.get_policy_index() + clarity_policies = full_policies.get('clarity_policies', []) + return clarity_policies @property def policies_enabled(self): - """Return True if the policies are enabled for this project.""" - return bool(self.policy_index) + """Return True if any policies (license or clarity) are enabled for this project.""" + full_policies = self.get_policy_index() + license_policies = full_policies.get('license_policies', {}) + clarity_policies = full_policies.get('clarity_policies', []) + return bool(license_policies or clarity_policies) class GroupingQuerySetMixin: diff --git a/scanpipe/pipes/compliance.py b/scanpipe/pipes/compliance.py index 0a0c0c776c..cd326b3a15 100644 --- a/scanpipe/pipes/compliance.py +++ b/scanpipe/pipes/compliance.py @@ -26,6 +26,7 @@ from scanpipe.models import ComplianceAlertMixin from scanpipe.pipes import flag from scanpipe.pipes import scancode +from scanpipe import policies """ A common compliance pattern for images is to store known licenses in a /licenses @@ -123,3 +124,19 @@ def get_project_compliance_alerts(project, fail_level="error"): } return project_compliance_alerts + +def add_clarity_compliance_to_summary(summary, project): + """ + Add clarity compliance alert to summary data. + Called from make_results_summary function. + """ + + clarity_score = summary.get("license_clarity_score", {}).get("score") + if clarity_score is None: + return summary + + clarity_policies = project.clarity_policy_index + clarity_compliance = policies.evaluate_clarity_compliance(clarity_score, clarity_policies) + + summary["clarity_compliance_alert"] = clarity_compliance + return summary diff --git a/scanpipe/pipes/scancode.py b/scanpipe/pipes/scancode.py index f3501c9eca..af6c84eb1b 100644 --- a/scanpipe/pipes/scancode.py +++ b/scanpipe/pipes/scancode.py @@ -934,6 +934,7 @@ def make_results_summary(project, scan_results_location): """ from scanpipe.api.serializers import CodebaseResourceSerializer from scanpipe.api.serializers import DiscoveredPackageSerializer + from scanpipe import policies with open(scan_results_location) as f: scan_data = json.load(f) @@ -964,4 +965,10 @@ def make_results_summary(project, scan_results_location): DiscoveredPackageSerializer(package).data for package in key_files_packages_qs ] + clarity_score = summary.get("license_clarity_score", {}).get("score") + if clarity_score is not None: + clarity_policies = project.clarity_policy_index + clarity_compliance = policies.evaluate_clarity_compliance(clarity_score, clarity_policies) + summary["clarity_compliance_alert"] = clarity_compliance + return summary diff --git a/scanpipe/policies.py b/scanpipe/policies.py index d0ea94e5c3..48d28dd2cd 100644 --- a/scanpipe/policies.py +++ b/scanpipe/policies.py @@ -54,6 +54,27 @@ def validate_policies(policies_dict): "The `license_policies` key is missing from provided policies data." ) + if "clarity_policies" in policies_dict: + clarity_policies = policies_dict["clarity_policies"] + if not isinstance(clarity_policies, list): + raise ValidationError("The `clarity_policies` must be a list.") + + for policy in clarity_policies: + if not isinstance(policy, dict): + raise ValidationError("Each clarity policy must be a dictionary.") + if "threshold" not in policy: + raise ValidationError("Each clarity policy must have a 'threshold' field.") + + threshold = policy["threshold"] + if isinstance(threshold, str): + try: + policy["threshold"] = float(threshold) if '.' in threshold else int(threshold) + except ValueError: + raise ValidationError(f"Clarity policy 'threshold' must be a valid number. Got: {threshold}") + + if not isinstance(policy["threshold"], (int, float)): + raise ValidationError("Clarity policy 'threshold' must be a number.") + return True @@ -63,3 +84,30 @@ def make_license_policy_index(policies_dict): license_policies = policies_dict.get("license_policies", []) return {policy.get("license_key"): policy for policy in license_policies} + + +def make_clarity_policy_index(policies_dict): + """Return a list of clarity policies sorted by threshold (descending).""" + if "clarity_policies" not in policies_dict: + return [] + + clarity_policies = policies_dict.get("clarity_policies", []) + return sorted(clarity_policies, key=lambda p: p.get("threshold", 0), reverse=True) + + +def evaluate_clarity_compliance(clarity_score, clarity_policies): + """ + Evaluate clarity score against policies and return compliance alert. + Returns the most appropriate compliance alert based on the score. + """ + if not clarity_policies: + return "" + + if clarity_score is None: + return "missing" + + for policy in clarity_policies: + if clarity_score >= policy.get("threshold", 0): + return policy.get("compliance_alert", "") + + return "error" \ No newline at end of file