Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions scanpipe/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

from scanpipe.policies import load_policies_file
from scanpipe.policies import make_license_policy_index
from scanpipe.policies import make_clarity_policy_index

try:
from importlib import metadata as importlib_metadata
Expand Down Expand Up @@ -218,7 +219,7 @@ def get_scancode_licenses(self):

def set_policies(self):
"""
Compute and sets the `license_policies` on the app instance.
Compute and sets the `license_policies` and `clarity_policies` on the app instance.

If the policies file is available but not formatted properly or doesn't
include the proper content, we want to raise an exception while the app
Expand All @@ -233,8 +234,7 @@ def set_policies(self):
policies = load_policies_file(policies_file)
logger.debug(style.SUCCESS(f"Loaded policies from {policies_file}"))
self.license_policies_index = make_license_policy_index(policies)
else:
logger.debug(style.WARNING("Policies file not found."))
self.clarity_policies_index = make_clarity_policy_index(policies)

def sync_runs_and_jobs(self):
"""Synchronize ``QUEUED`` and ``RUNNING`` Run with their related Jobs."""
Expand Down
36 changes: 28 additions & 8 deletions scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,30 +1503,50 @@ def get_policy_index(self):
The policies are loaded from the following locations in that order:
1. the project local settings
2. the "policies.yml" file in the project input/ directory
3. the global app settings license policies
3. the global app settings license and clarity policies
"""
if policies_from_settings := self.get_env("policies"):
policies_dict = policies_from_settings
if isinstance(policies_from_settings, str):
policies_dict = policies.load_policies_yaml(policies_from_settings)
return policies.make_license_policy_index(policies_dict)
return {
'license_policies': policies.make_license_policy_index(policies_dict),
'clarity_policies': policies.make_clarity_policy_index(policies_dict)
}

elif policies_file := self.get_input_policies_file():
policies_dict = policies.load_policies_file(policies_file)
return policies.make_license_policy_index(policies_dict)
return {
'license_policies': policies.make_license_policy_index(policies_dict),
'clarity_policies': policies.make_clarity_policy_index(policies_dict)
}

else:
return scanpipe_app.license_policies_index
return {
'license_policies': scanpipe_app.license_policies_index,
'clarity_policies': getattr(scanpipe_app, 'clarity_policies_index', [])
}

@cached_property
def policy_index(self):
"""Return the cached policy index for this project instance."""
return self.get_policy_index()
"""Return the cached license policy index for this project instance"""
full_policies = self.get_policy_index()
return full_policies.get('license_policies', {})

@cached_property
def clarity_policy_index(self):
"""Return clarity policies"""
full_policies = self.get_policy_index()
clarity_policies = full_policies.get('clarity_policies', [])
return clarity_policies

@property
def policies_enabled(self):
"""Return True if the policies are enabled for this project."""
return bool(self.policy_index)
"""Return True if any policies (license or clarity) are enabled for this project."""
full_policies = self.get_policy_index()
license_policies = full_policies.get('license_policies', {})
clarity_policies = full_policies.get('clarity_policies', [])
return bool(license_policies or clarity_policies)


class GroupingQuerySetMixin:
Expand Down
17 changes: 17 additions & 0 deletions scanpipe/pipes/compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from scanpipe.models import ComplianceAlertMixin
from scanpipe.pipes import flag
from scanpipe.pipes import scancode
from scanpipe import policies

"""
A common compliance pattern for images is to store known licenses in a /licenses
Expand Down Expand Up @@ -123,3 +124,19 @@ def get_project_compliance_alerts(project, fail_level="error"):
}

return project_compliance_alerts

def add_clarity_compliance_to_summary(summary, project):
"""
Add clarity compliance alert to summary data.
Called from make_results_summary function.
"""

clarity_score = summary.get("license_clarity_score", {}).get("score")
if clarity_score is None:
return summary

clarity_policies = project.clarity_policy_index
clarity_compliance = policies.evaluate_clarity_compliance(clarity_score, clarity_policies)

summary["clarity_compliance_alert"] = clarity_compliance
return summary
7 changes: 7 additions & 0 deletions scanpipe/pipes/scancode.py
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ def make_results_summary(project, scan_results_location):
"""
from scanpipe.api.serializers import CodebaseResourceSerializer
from scanpipe.api.serializers import DiscoveredPackageSerializer
from scanpipe import policies

with open(scan_results_location) as f:
scan_data = json.load(f)
Expand Down Expand Up @@ -964,4 +965,10 @@ def make_results_summary(project, scan_results_location):
DiscoveredPackageSerializer(package).data for package in key_files_packages_qs
]

clarity_score = summary.get("license_clarity_score", {}).get("score")
if clarity_score is not None:
clarity_policies = project.clarity_policy_index
clarity_compliance = policies.evaluate_clarity_compliance(clarity_score, clarity_policies)
summary["clarity_compliance_alert"] = clarity_compliance

return summary
48 changes: 48 additions & 0 deletions scanpipe/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ def validate_policies(policies_dict):
"The `license_policies` key is missing from provided policies data."
)

if "clarity_policies" in policies_dict:
clarity_policies = policies_dict["clarity_policies"]
if not isinstance(clarity_policies, list):
raise ValidationError("The `clarity_policies` must be a list.")

for policy in clarity_policies:
if not isinstance(policy, dict):
raise ValidationError("Each clarity policy must be a dictionary.")
if "threshold" not in policy:
raise ValidationError("Each clarity policy must have a 'threshold' field.")

threshold = policy["threshold"]
if isinstance(threshold, str):
try:
policy["threshold"] = float(threshold) if '.' in threshold else int(threshold)
except ValueError:
raise ValidationError(f"Clarity policy 'threshold' must be a valid number. Got: {threshold}")

if not isinstance(policy["threshold"], (int, float)):
raise ValidationError("Clarity policy 'threshold' must be a number.")

return True


Expand All @@ -63,3 +84,30 @@ def make_license_policy_index(policies_dict):

license_policies = policies_dict.get("license_policies", [])
return {policy.get("license_key"): policy for policy in license_policies}


def make_clarity_policy_index(policies_dict):
"""Return a list of clarity policies sorted by threshold (descending)."""
if "clarity_policies" not in policies_dict:
return []

clarity_policies = policies_dict.get("clarity_policies", [])
return sorted(clarity_policies, key=lambda p: p.get("threshold", 0), reverse=True)


def evaluate_clarity_compliance(clarity_score, clarity_policies):
"""
Evaluate clarity score against policies and return compliance alert.
Returns the most appropriate compliance alert based on the score.
"""
if not clarity_policies:
return ""

if clarity_score is None:
return "missing"

for policy in clarity_policies:
if clarity_score >= policy.get("threshold", 0):
return policy.get("compliance_alert", "")

return "error"
Loading