Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions scanpipe/migrations/0075_codebaseresource_parent_path_and_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 5.1.9 on 2025-06-16 17:42

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('scanpipe', '0074_discovered_license_models'),
]

operations = [
migrations.AddField(
model_name='codebaseresource',
name='parent_path',
field=models.CharField(blank=True, help_text='The path of the resource\'s parent directory. Set to None for top-level (root) resources. Used to efficiently retrieve a directory\'s contents.', max_length=2000),
),
migrations.AddIndex(
model_name='codebaseresource',
index=models.Index(fields=['project', 'parent_path'], name='scanpipe_co_project_008448_idx'),
),
]
20 changes: 19 additions & 1 deletion scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2739,6 +2739,17 @@ class CodebaseResource(
'Eg.: "/usr/bin/bash" for a path of "tarball-extract/rootfs/usr/bin/bash"'
),
)

parent_path = models.CharField(
max_length=2000,
blank=True,
help_text=_(
"The path of the resource's parent directory. "
"Set to empty string for top-level (root) resources. "
"Used to efficiently retrieve a directory's contents."
),
)

status = models.CharField(
blank=True,
max_length=50,
Expand Down Expand Up @@ -2832,6 +2843,7 @@ class Meta:
models.Index(fields=["compliance_alert"]),
models.Index(fields=["is_binary"]),
models.Index(fields=["is_text"]),
models.Index(fields=["project", "parent_path"]),
]
constraints = [
models.UniqueConstraint(
Expand All @@ -2844,6 +2856,11 @@ class Meta:
def __str__(self):
return self.path

def save(self, *args, **kwargs):
if self.path and not self.parent_path:
self.parent_path = self.parent_directory() or ""
super().save(*args, **kwargs)

def get_absolute_url(self):
return reverse("resource_detail", args=[self.project.slug, self.path])

Expand Down Expand Up @@ -2914,7 +2931,8 @@ def get_path_segments_with_subpath(self):

def parent_directory(self):
"""Return the parent path for this CodebaseResource or None."""
return parent_directory(self.path, with_trail=False)
parent_path = parent_directory(str(self.path), with_trail=False)
return parent_path or None

def has_parent(self):
"""
Expand Down
6 changes: 6 additions & 0 deletions scanpipe/pipes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ def make_codebase_resource(project, location, save=True, **extra_fields):
from scanpipe.pipes import flag

relative_path = Path(location).relative_to(project.codebase_path)
parent_path = str(relative_path.parent)

if parent_path == ".":
parent_path = ""

try:
resource_data = scancode.get_resource_info(location=str(location))
except OSError as error:
Expand All @@ -92,6 +97,7 @@ def make_codebase_resource(project, location, save=True, **extra_fields):
codebase_resource = CodebaseResource(
project=project,
path=relative_path,
parent_path=parent_path,
**resource_data,
)

Expand Down
8 changes: 8 additions & 0 deletions scanpipe/pipes/rootfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ def get_res(parent, fname):
rootfs_path=rootfs_path,
)

# Explicitly yields the root directory as a resource when `with_dir` is True
if with_dir:
Comment thread
tdruez marked this conversation as resolved.
rootfs_path = "/"
yield Resource(
location=location,
rootfs_path=rootfs_path,
)

for top, dirs, files in os.walk(location):
for f in files:
yield get_res(parent=top, fname=f)
Expand Down
132 changes: 73 additions & 59 deletions scanpipe/pipes/scancode.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,80 +900,94 @@ def get_virtual_codebase(project, input_location):
return VirtualCodebase(input_location, temp_dir=str(temp_path), max_in_memory=0)


def create_codebase_resources(project, scanned_codebase):
"""
Save the resources of a ScanCode `scanned_codebase` scancode.resource.Codebase
object to the database as a CodebaseResource of the `project`.
This function can be used to expend an existing `project` Codebase with new
CodebaseResource objects as the existing objects (based on the `path`) will be
skipped.
"""
for scanned_resource in scanned_codebase.walk(skip_root=True):
resource_data = {}

for field in CodebaseResource._meta.fields:
# Do not include the path as provided by the scanned_resource since it
# includes the "root". The `get_path` method is used instead.
if field.name == "path":
continue
value = getattr(scanned_resource, field.name, None)
if value is not None:
resource_data[field.name] = value

resource_type = "FILE" if scanned_resource.is_file else "DIRECTORY"
resource_data["type"] = CodebaseResource.Type[resource_type]
resource_path = scanned_resource.get_path(strip_root=True)

codebase_resource, _ = CodebaseResource.objects.get_or_create(
def create_codebase_resource(project, scanned_resource):
"""Create a CodebaseResource entry from ScanCode scanned data."""
resource_data = {}

for field in CodebaseResource._meta.fields:
# Do not include the path as provided by the scanned_resource since it
# includes the "root". The `get_path` method is used instead.
if field.name in ["path", "parent_path"]:
continue
value = getattr(scanned_resource, field.name, None)
if value is not None:
resource_data[field.name] = value

resource_type = "FILE" if scanned_resource.is_file else "DIRECTORY"
resource_data["type"] = CodebaseResource.Type[resource_type]
resource_path = scanned_resource.get_path(strip_root=True)

parent_path = str(Path(resource_path).parent)
if parent_path == ".":
parent_path = ""
resource_data["parent_path"] = parent_path

codebase_resource, _ = CodebaseResource.objects.get_or_create(
project=project,
path=resource_path,
defaults=resource_data,
)

# Handle package assignments
for_packages = getattr(scanned_resource, "for_packages", [])
for package_uid in for_packages:
logger.debug(f"Assign {package_uid} to {codebase_resource}")
package = project.discoveredpackages.get(package_uid=package_uid)
set_codebase_resource_for_package(
codebase_resource=codebase_resource,
discovered_package=package,
)

# Handle license detections
license_detections = getattr(scanned_resource, "license_detections", [])
for detection_data in license_detections:
detection_identifier = detection_data.get("identifier")
pipes.update_or_create_license_detection(
project=project,
path=resource_path,
defaults=resource_data,
detection_data=detection_data,
resource_path=resource_path,
count_detection=False,
)
logger.debug(f"Add {codebase_resource} to {detection_identifier}")

for_packages = getattr(scanned_resource, "for_packages", [])
for package_uid in for_packages:
logger.debug(f"Assign {package_uid} to {codebase_resource}")
package = project.discoveredpackages.get(package_uid=package_uid)
set_codebase_resource_for_package(
codebase_resource=codebase_resource,
discovered_package=package,
)
# Handle license clues
license_clues = getattr(scanned_resource, "license_clues", [])
for clue_data in license_clues:
pipes.update_or_create_license_detection(
project=project,
detection_data=clue_data,
resource_path=resource_path,
is_license_clue=True,
)
logger.debug(f"Add license clue at {codebase_resource}")

license_detections = getattr(scanned_resource, "license_detections", [])
# Handle package data
packages = getattr(scanned_resource, "package_data", [])
for package_data in packages:
license_detections = package_data.get("license_detections", [])
license_detections.extend(package_data.get("other_license_detections", []))
for detection_data in license_detections:
detection_identifier = detection_data.get("identifier")
pipes.update_or_create_license_detection(
project=project,
detection_data=detection_data,
resource_path=resource_path,
count_detection=False,
from_package=True,
)
logger.debug(f"Add {codebase_resource} to {detection_identifier}")

license_clues = getattr(scanned_resource, "license_clues", [])
for clue_data in license_clues:
pipes.update_or_create_license_detection(
project=project,
detection_data=clue_data,
resource_path=resource_path,
is_license_clue=True,
)
logger.debug(f"Add license clue at {codebase_resource}")

packages = getattr(scanned_resource, "package_data", [])
for package_data in packages:
license_detections = package_data.get("license_detections", [])
license_detections.extend(package_data.get("other_license_detections", []))
for detection_data in license_detections:
detection_identifier = detection_data.get("identifier")
pipes.update_or_create_license_detection(
project=project,
detection_data=detection_data,
resource_path=resource_path,
count_detection=False,
from_package=True,
)
logger.debug(f"Add {codebase_resource} to {detection_identifier}")
def create_codebase_resources(project, scanned_codebase):
"""
Save the resources of a ScanCode `scanned_codebase` scancode.resource.Codebase
object to the database as a CodebaseResource of the `project`.
This function can be used to expend an existing `project` Codebase with new
CodebaseResource objects as the existing objects (based on the `path`) will be
skipped.
"""
for scanned_resource in scanned_codebase.walk(skip_root=True):
create_codebase_resource(project, scanned_resource)


def create_discovered_packages(project, scanned_codebase):
Expand Down
36 changes: 36 additions & 0 deletions scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,42 @@
],
"dependencies": [],
"files": [
{
"path": "basic-rootfs.tar.gz-extract",
"type": "directory",
"name": "basic-rootfs.tar.gz-extract",
"status": "scanned",
"for_packages": [],
"tag": "",
"extension": ".tar.gz-extract",
"programming_language": "",
"detected_license_expression": "",
"detected_license_expression_spdx": "",
"license_detections": [],
"license_clues": [],
"percentage_of_license_text": null,
"copyrights": [],
"holders": [],
"authors": [],
"package_data": [],
"emails": [],
"urls": [],
"md5": "",
"sha1": "",
"sha256": "",
"sha512": "",
"sha1_git": "",
"is_binary": false,
"is_text": false,
"is_archive": false,
"is_media": false,
"is_legal": false,
"is_manifest": false,
"is_readme": false,
"is_top_level": false,
"is_key_file": false,
"extra_data": {}
},
{
"path": "basic-rootfs.tar.gz-extract/etc",
"type": "directory",
Expand Down
18 changes: 18 additions & 0 deletions scanpipe/tests/pipes/test_scancode.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,3 +723,21 @@ def test_scanpipe_scancode_resolve_dependencies_no_requirements(self):
resolved_dep = project1.discovereddependencies.get(name="bluebird")
self.assertEqual(resolved_dep, dep_2)
self.assertEqual(resolved_dep.resolved_to_package, pkg_1)

def test_scanpipe_pipes_scancode_scan_single_package_correct_parent_path(self):
project1 = Project.objects.create(name="Analysis")
input_location = self.data / "scancode" / "is-npm-1.0.0.tgz"
project1.copy_input_from(input_location)
run = project1.add_pipeline("scan_single_package")
pipeline = run.make_pipeline_instance()
exitcode, out = pipeline.execute()

self.assertEqual(0, exitcode, msg=out)
self.assertEqual(4, project1.codebaseresources.count())

root = project1.codebaseresources.get(path="package")
self.assertEqual("", root.parent_path)
self.assertNotEqual("codebase", root.parent_path)

file1 = project1.codebaseresources.get(path="package/index.js")
self.assertEqual("package", file1.parent_path)
10 changes: 10 additions & 0 deletions scanpipe/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,16 @@ def test_scanpipe_can_compute_compliance_alert_for_license_exceptions(self):
resource.update(detected_license_expression=license_expression)
self.assertEqual("warning", resource.compute_compliance_alert())

def test_scanpipe_codebase_root_parent_path(self):
resource1 = self.project1.codebaseresources.create(path="file")

self.assertEqual("", resource1.parent_path)
Comment thread
tdruez marked this conversation as resolved.

def test_scanpipe_codebase_regular_parent_path(self):
resource2 = self.project1.codebaseresources.create(path="dir1/dir2/file")

self.assertEqual("dir1/dir2", resource2.parent_path)

def test_scanpipe_scan_fields_model_mixin_methods(self):
expected = [
"detected_license_expression",
Expand Down
Loading