diff --git a/scanpipe/migrations/0075_codebaseresource_parent_path_and_more.py b/scanpipe/migrations/0075_codebaseresource_parent_path_and_more.py new file mode 100644 index 0000000000..5891ef9878 --- /dev/null +++ b/scanpipe/migrations/0075_codebaseresource_parent_path_and_more.py @@ -0,0 +1,22 @@ +# Generated by Django 5.1.9 on 2025-06-16 17:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('scanpipe', '0074_discovered_license_models'), + ] + + operations = [ + migrations.AddField( + model_name='codebaseresource', + name='parent_path', + field=models.CharField(blank=True, help_text='The path of the resource\'s parent directory. Set to None for top-level (root) resources. Used to efficiently retrieve a directory\'s contents.', max_length=2000), + ), + migrations.AddIndex( + model_name='codebaseresource', + index=models.Index(fields=['project', 'parent_path'], name='scanpipe_co_project_008448_idx'), + ), + ] diff --git a/scanpipe/models.py b/scanpipe/models.py index ac28627808..50be1ed4e0 100644 --- a/scanpipe/models.py +++ b/scanpipe/models.py @@ -2739,6 +2739,17 @@ class CodebaseResource( 'Eg.: "/usr/bin/bash" for a path of "tarball-extract/rootfs/usr/bin/bash"' ), ) + + parent_path = models.CharField( + max_length=2000, + blank=True, + help_text=_( + "The path of the resource's parent directory. " + "Set to empty string for top-level (root) resources. " + "Used to efficiently retrieve a directory's contents." + ), + ) + status = models.CharField( blank=True, max_length=50, @@ -2832,6 +2843,7 @@ class Meta: models.Index(fields=["compliance_alert"]), models.Index(fields=["is_binary"]), models.Index(fields=["is_text"]), + models.Index(fields=["project", "parent_path"]), ] constraints = [ models.UniqueConstraint( @@ -2844,6 +2856,11 @@ class Meta: def __str__(self): return self.path + def save(self, *args, **kwargs): + if self.path and not self.parent_path: + self.parent_path = self.parent_directory() or "" + super().save(*args, **kwargs) + def get_absolute_url(self): return reverse("resource_detail", args=[self.project.slug, self.path]) @@ -2914,7 +2931,8 @@ def get_path_segments_with_subpath(self): def parent_directory(self): """Return the parent path for this CodebaseResource or None.""" - return parent_directory(self.path, with_trail=False) + parent_path = parent_directory(str(self.path), with_trail=False) + return parent_path or None def has_parent(self): """ diff --git a/scanpipe/pipes/__init__.py b/scanpipe/pipes/__init__.py index d3fe69f69a..788a1ce828 100644 --- a/scanpipe/pipes/__init__.py +++ b/scanpipe/pipes/__init__.py @@ -72,6 +72,11 @@ def make_codebase_resource(project, location, save=True, **extra_fields): from scanpipe.pipes import flag relative_path = Path(location).relative_to(project.codebase_path) + parent_path = str(relative_path.parent) + + if parent_path == ".": + parent_path = "" + try: resource_data = scancode.get_resource_info(location=str(location)) except OSError as error: @@ -92,6 +97,7 @@ def make_codebase_resource(project, location, save=True, **extra_fields): codebase_resource = CodebaseResource( project=project, path=relative_path, + parent_path=parent_path, **resource_data, ) diff --git a/scanpipe/pipes/rootfs.py b/scanpipe/pipes/rootfs.py index 9c623491a7..95325d38d3 100644 --- a/scanpipe/pipes/rootfs.py +++ b/scanpipe/pipes/rootfs.py @@ -139,6 +139,14 @@ def get_res(parent, fname): rootfs_path=rootfs_path, ) + # Explicitly yields the root directory as a resource when `with_dir` is True + if with_dir: + rootfs_path = "/" + yield Resource( + location=location, + rootfs_path=rootfs_path, + ) + for top, dirs, files in os.walk(location): for f in files: yield get_res(parent=top, fname=f) diff --git a/scanpipe/pipes/scancode.py b/scanpipe/pipes/scancode.py index 8f169311d2..da0a729d35 100644 --- a/scanpipe/pipes/scancode.py +++ b/scanpipe/pipes/scancode.py @@ -900,46 +900,72 @@ def get_virtual_codebase(project, input_location): return VirtualCodebase(input_location, temp_dir=str(temp_path), max_in_memory=0) -def create_codebase_resources(project, scanned_codebase): - """ - Save the resources of a ScanCode `scanned_codebase` scancode.resource.Codebase - object to the database as a CodebaseResource of the `project`. - This function can be used to expend an existing `project` Codebase with new - CodebaseResource objects as the existing objects (based on the `path`) will be - skipped. - """ - for scanned_resource in scanned_codebase.walk(skip_root=True): - resource_data = {} - - for field in CodebaseResource._meta.fields: - # Do not include the path as provided by the scanned_resource since it - # includes the "root". The `get_path` method is used instead. - if field.name == "path": - continue - value = getattr(scanned_resource, field.name, None) - if value is not None: - resource_data[field.name] = value - - resource_type = "FILE" if scanned_resource.is_file else "DIRECTORY" - resource_data["type"] = CodebaseResource.Type[resource_type] - resource_path = scanned_resource.get_path(strip_root=True) - - codebase_resource, _ = CodebaseResource.objects.get_or_create( +def create_codebase_resource(project, scanned_resource): + """Create a CodebaseResource entry from ScanCode scanned data.""" + resource_data = {} + + for field in CodebaseResource._meta.fields: + # Do not include the path as provided by the scanned_resource since it + # includes the "root". The `get_path` method is used instead. + if field.name in ["path", "parent_path"]: + continue + value = getattr(scanned_resource, field.name, None) + if value is not None: + resource_data[field.name] = value + + resource_type = "FILE" if scanned_resource.is_file else "DIRECTORY" + resource_data["type"] = CodebaseResource.Type[resource_type] + resource_path = scanned_resource.get_path(strip_root=True) + + parent_path = str(Path(resource_path).parent) + if parent_path == ".": + parent_path = "" + resource_data["parent_path"] = parent_path + + codebase_resource, _ = CodebaseResource.objects.get_or_create( + project=project, + path=resource_path, + defaults=resource_data, + ) + + # Handle package assignments + for_packages = getattr(scanned_resource, "for_packages", []) + for package_uid in for_packages: + logger.debug(f"Assign {package_uid} to {codebase_resource}") + package = project.discoveredpackages.get(package_uid=package_uid) + set_codebase_resource_for_package( + codebase_resource=codebase_resource, + discovered_package=package, + ) + + # Handle license detections + license_detections = getattr(scanned_resource, "license_detections", []) + for detection_data in license_detections: + detection_identifier = detection_data.get("identifier") + pipes.update_or_create_license_detection( project=project, - path=resource_path, - defaults=resource_data, + detection_data=detection_data, + resource_path=resource_path, + count_detection=False, ) + logger.debug(f"Add {codebase_resource} to {detection_identifier}") - for_packages = getattr(scanned_resource, "for_packages", []) - for package_uid in for_packages: - logger.debug(f"Assign {package_uid} to {codebase_resource}") - package = project.discoveredpackages.get(package_uid=package_uid) - set_codebase_resource_for_package( - codebase_resource=codebase_resource, - discovered_package=package, - ) + # Handle license clues + license_clues = getattr(scanned_resource, "license_clues", []) + for clue_data in license_clues: + pipes.update_or_create_license_detection( + project=project, + detection_data=clue_data, + resource_path=resource_path, + is_license_clue=True, + ) + logger.debug(f"Add license clue at {codebase_resource}") - license_detections = getattr(scanned_resource, "license_detections", []) + # Handle package data + packages = getattr(scanned_resource, "package_data", []) + for package_data in packages: + license_detections = package_data.get("license_detections", []) + license_detections.extend(package_data.get("other_license_detections", [])) for detection_data in license_detections: detection_identifier = detection_data.get("identifier") pipes.update_or_create_license_detection( @@ -947,33 +973,21 @@ def create_codebase_resources(project, scanned_codebase): detection_data=detection_data, resource_path=resource_path, count_detection=False, + from_package=True, ) logger.debug(f"Add {codebase_resource} to {detection_identifier}") - license_clues = getattr(scanned_resource, "license_clues", []) - for clue_data in license_clues: - pipes.update_or_create_license_detection( - project=project, - detection_data=clue_data, - resource_path=resource_path, - is_license_clue=True, - ) - logger.debug(f"Add license clue at {codebase_resource}") - packages = getattr(scanned_resource, "package_data", []) - for package_data in packages: - license_detections = package_data.get("license_detections", []) - license_detections.extend(package_data.get("other_license_detections", [])) - for detection_data in license_detections: - detection_identifier = detection_data.get("identifier") - pipes.update_or_create_license_detection( - project=project, - detection_data=detection_data, - resource_path=resource_path, - count_detection=False, - from_package=True, - ) - logger.debug(f"Add {codebase_resource} to {detection_identifier}") +def create_codebase_resources(project, scanned_codebase): + """ + Save the resources of a ScanCode `scanned_codebase` scancode.resource.Codebase + object to the database as a CodebaseResource of the `project`. + This function can be used to expend an existing `project` Codebase with new + CodebaseResource objects as the existing objects (based on the `path`) will be + skipped. + """ + for scanned_resource in scanned_codebase.walk(skip_root=True): + create_codebase_resource(project, scanned_resource) def create_discovered_packages(project, scanned_codebase): diff --git a/scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json b/scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json index 970d672007..1a4019bcda 100644 --- a/scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json +++ b/scanpipe/tests/data/rootfs/basic-rootfs_root_filesystems.json @@ -340,6 +340,42 @@ ], "dependencies": [], "files": [ + { + "path": "basic-rootfs.tar.gz-extract", + "type": "directory", + "name": "basic-rootfs.tar.gz-extract", + "status": "scanned", + "for_packages": [], + "tag": "", + "extension": ".tar.gz-extract", + "programming_language": "", + "detected_license_expression": "", + "detected_license_expression_spdx": "", + "license_detections": [], + "license_clues": [], + "percentage_of_license_text": null, + "copyrights": [], + "holders": [], + "authors": [], + "package_data": [], + "emails": [], + "urls": [], + "md5": "", + "sha1": "", + "sha256": "", + "sha512": "", + "sha1_git": "", + "is_binary": false, + "is_text": false, + "is_archive": false, + "is_media": false, + "is_legal": false, + "is_manifest": false, + "is_readme": false, + "is_top_level": false, + "is_key_file": false, + "extra_data": {} + }, { "path": "basic-rootfs.tar.gz-extract/etc", "type": "directory", diff --git a/scanpipe/tests/pipes/test_scancode.py b/scanpipe/tests/pipes/test_scancode.py index 4904cb68f8..5f37f6e754 100644 --- a/scanpipe/tests/pipes/test_scancode.py +++ b/scanpipe/tests/pipes/test_scancode.py @@ -723,3 +723,21 @@ def test_scanpipe_scancode_resolve_dependencies_no_requirements(self): resolved_dep = project1.discovereddependencies.get(name="bluebird") self.assertEqual(resolved_dep, dep_2) self.assertEqual(resolved_dep.resolved_to_package, pkg_1) + + def test_scanpipe_pipes_scancode_scan_single_package_correct_parent_path(self): + project1 = Project.objects.create(name="Analysis") + input_location = self.data / "scancode" / "is-npm-1.0.0.tgz" + project1.copy_input_from(input_location) + run = project1.add_pipeline("scan_single_package") + pipeline = run.make_pipeline_instance() + exitcode, out = pipeline.execute() + + self.assertEqual(0, exitcode, msg=out) + self.assertEqual(4, project1.codebaseresources.count()) + + root = project1.codebaseresources.get(path="package") + self.assertEqual("", root.parent_path) + self.assertNotEqual("codebase", root.parent_path) + + file1 = project1.codebaseresources.get(path="package/index.js") + self.assertEqual("package", file1.parent_path) diff --git a/scanpipe/tests/test_models.py b/scanpipe/tests/test_models.py index ef3f08039e..503072cffb 100644 --- a/scanpipe/tests/test_models.py +++ b/scanpipe/tests/test_models.py @@ -1646,6 +1646,16 @@ def test_scanpipe_can_compute_compliance_alert_for_license_exceptions(self): resource.update(detected_license_expression=license_expression) self.assertEqual("warning", resource.compute_compliance_alert()) + def test_scanpipe_codebase_root_parent_path(self): + resource1 = self.project1.codebaseresources.create(path="file") + + self.assertEqual("", resource1.parent_path) + + def test_scanpipe_codebase_regular_parent_path(self): + resource2 = self.project1.codebaseresources.create(path="dir1/dir2/file") + + self.assertEqual("dir1/dir2", resource2.parent_path) + def test_scanpipe_scan_fields_model_mixin_methods(self): expected = [ "detected_license_expression", diff --git a/scanpipe/tests/test_pipelines.py b/scanpipe/tests/test_pipelines.py index 0852bc8418..40b567ba04 100644 --- a/scanpipe/tests/test_pipelines.py +++ b/scanpipe/tests/test_pipelines.py @@ -863,6 +863,60 @@ def test_scanpipe_scan_codebase_pipeline_integration(self): expected_file = self.data / "scancode" / "is-npm-1.0.0_scan_codebase.json" self.assertPipelineResultEqual(expected_file, result_file) + def test_scanpipe_scan_codebase_creates_top_level_paths(self): + pipeline_name = "scan_codebase" + project1 = make_project() + + filename = "is-npm-1.0.0.tgz" + input_location = self.data / "scancode" / filename + project1.copy_input_from(input_location) + + run = project1.add_pipeline(pipeline_name) + pipeline = run.make_pipeline_instance() + + exitcode, out = pipeline.execute() + self.assertEqual(0, exitcode, msg=out) + + expected_top_level_paths = ["is-npm-1.0.0.tgz", "is-npm-1.0.0.tgz-extract"] + + top_level_resources = project1.codebaseresources.filter(parent_path="") + top_level_paths = [resource.path for resource in top_level_resources] + + self.assertListEqual(top_level_paths, expected_top_level_paths) + + def test_scanpipe_scan_codebase_creates_parent_path_field(self): + pipeline_name = "scan_codebase" + project1 = make_project() + + filename = "is-npm-1.0.0.tgz" + input_location = self.data / "scancode" / filename + project1.copy_input_from(input_location) + + run = project1.add_pipeline(pipeline_name) + pipeline = run.make_pipeline_instance() + + exitcode, out = pipeline.execute() + self.assertEqual(0, exitcode, msg=out) + + expected_top_level_paths = ["is-npm-1.0.0.tgz", "is-npm-1.0.0.tgz-extract"] + expected_nested_paths = [ + "is-npm-1.0.0.tgz-extract/package/index.js", + "is-npm-1.0.0.tgz-extract/package/package.json", + "is-npm-1.0.0.tgz-extract/package/readme.md", + ] + + top_level_resources = project1.codebaseresources.filter(parent_path="") + top_level_paths = [resource.path for resource in top_level_resources] + + self.assertListEqual(top_level_paths, expected_top_level_paths) + + nested_resources = project1.codebaseresources.filter( + parent_path="is-npm-1.0.0.tgz-extract/package" + ) + nested_paths = [resource.path for resource in nested_resources] + + self.assertListEqual(nested_paths, expected_nested_paths) + def test_scanpipe_inspect_packages_creates_packages_npm(self): pipeline_name = "inspect_packages" project1 = make_project() @@ -1209,7 +1263,7 @@ def test_scanpipe_rootfs_pipeline_integration(self): exitcode, out = pipeline.execute() self.assertEqual(0, exitcode, msg=out) - self.assertEqual(16, project1.codebaseresources.count()) + self.assertEqual(17, project1.codebaseresources.count()) self.assertEqual(2, project1.discoveredpackages.count()) self.assertEqual(0, project1.discovereddependencies.count())