Skip to content

add management command to import all unit test sample scans#12700

Merged
valentijnscholten merged 2 commits into
DefectDojo:bugfixfrom
valentijnscholten:import-all-the-things
Jul 3, 2025
Merged

add management command to import all unit test sample scans#12700
valentijnscholten merged 2 commits into
DefectDojo:bugfixfrom
valentijnscholten:import-all-the-things

Conversation

@valentijnscholten

Copy link
Copy Markdown
Member

I needed a way to load some realistic data into Defect Dojo. Not just one or two big scans, but ideally a couple of products + engagements + tests.

As an experiment I created this management command that imports all unit test sample scans present in the repository.

It's a naive first implementation:

  • ignores subfolders
  • ignores errors during import
  • can only handle 1 scan_type per parser
  • has some copy and pasted code from elsewhere

I am committing it anyway as it might be useful for others or maybe other can improve upon it :-)

A similar approach could be just to have an end-to-end test in the test suite that imports all sample scans and ensures there are no errors ("smoke test").
It can also be used for users to just get the import process going and see how the system holds up and/or scales up.

There are about 20 files that needs special attention. Some for example are intentionally malformed, so should not be imported. What could work in the future maybe is to have a copanion file for the scan report named <scan_report>.json.properties or something that holds a json blob with instructions like {"skip_on_all_import": true} or {"scan_type": "parser2"}.
With smart defaults this would only be needed for these 20 "special" files.

@dryrunsecurity

dryrunsecurity Bot commented Jun 26, 2025

Copy link
Copy Markdown

DryRun Security

This pull request contains a hardcoded administrator account in a test utility file, which is marked as experimental and intended for unittest scenarios, with minimal security risk due to its specific context and clear warnings.

Hardcoded Administrator Account in dojo/management/commands/import_all_unittest_scans.py
Vulnerability Hardcoded Administrator Account
Description This is a valid concern in a test utility. While the code is marked as 'EXPERIMENTAL' and intended for unittest scenarios, hardcoding an admin username creates a potential security anti-pattern. However, since this is explicitly a unittest management command with clear experimental warnings, the risk is minimal in this specific context.

import json
import logging
import os
from datetime import datetime
from importlib import import_module
from importlib.util import find_spec
from inspect import isclass
from pathlib import Path
from django.core.management.base import BaseCommand
from django.urls import reverse
from django.utils import timezone
from rest_framework.authtoken.models import Token
from rest_framework.test import APIClient
import dojo.tools.factory
from dojo.models import Engagement, Product, Product_Type
from unittests.test_dashboard import User
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = (
"EXPERIMENTAL: May be changed/deprecated/removed without prior notice. "
"Command to import all scans available in unittests folder"
)
def add_arguments(self, parser):
parser.add_argument(
"--product-name-prefix",
type=lambda s: s if len(s) <= 250 else parser.error("product-name-prefix must be at most 250 characters"),
help="Prefix to use for product names, defaults to 'All scans <today>'. Max length 250 characters.",
)
parser.add_argument(
"--include-very-big-scans",
action="store_true",
default=False,
help="Include very big scans like jfrog_xray very_many_vulns.json (default: False)",
)
parser.add_argument("--tests-per-engagement", type=int, default=10, help="Number of tests per engagement before a new engagement is created, defaults to 10")
parser.add_argument("--engagements-per-product", type=int, default=50, help="Number of engagements per product before a new product is created, defaults to 50")
parser.add_argument("--products-per-product-type", type=int, default=15, help="Number of products per product type before a new product type is created, defaults to 15")
parser.add_argument("--number-of-runs", type=int, default=1, help="Number of times to run the import of all sample scans, defaults to 1")
def get_test_admin(self, *args, **kwargs):
return User.objects.get(username="admin")
def import_scan(self, payload, expected_http_status_code):
testuser = self.get_test_admin()
token = Token.objects.get(user=testuser)
self.client = APIClient()
self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key)
response = self.client.post(reverse("importscan-list"), payload)
if expected_http_status_code != response.status_code:
msg = f"Expected HTTP status code {expected_http_status_code}, got {response.status_code}: {response.content[:1000]}"
raise AssertionError(
msg,
)
return json.loads(response.content)
def import_scan_with_params(self, filename, scan_type="ZAP Scan", engagement=1, minimum_severity="Low", *, active=True, verified=False,
push_to_jira=None, endpoint_to_add=None, tags=None, close_old_findings=False, group_by=None, engagement_name=None,
product_name=None, product_type_name=None, auto_create_context=None, expected_http_status_code=201, test_title=None,
scan_date=None, service=None, force_active=True, force_verified=True):
with (Path("unittests/scans") / filename).open(encoding="utf-8") as testfile:
payload = {
"minimum_severity": minimum_severity,
"scan_type": scan_type,
"file": testfile,
"version": "1.0.1",
"close_old_findings": close_old_findings,
}
if active is not None:
payload["active"] = active
if verified is not None:
payload["verified"] = verified
if engagement:
payload["engagement"] = engagement
if engagement_name:
payload["engagement_name"] = engagement_name
if product_name:
payload["product_name"] = product_name
if product_type_name:
payload["product_type_name"] = product_type_name
if auto_create_context:
payload["auto_create_context"] = auto_create_context
if push_to_jira is not None:
payload["push_to_jira"] = push_to_jira
if endpoint_to_add is not None:
payload["endpoint_to_add"] = endpoint_to_add
if tags is not None:
payload["tags"] = tags
if group_by is not None:
payload["group_by"] = group_by
if test_title is not None:
payload["test_title"] = test_title
if scan_date is not None:
payload["scan_date"] = scan_date
if service is not None:
payload["service"] = service
return self.import_scan(payload, expected_http_status_code)
def import_all_unittest_scans(self, product_name_prefix=None, tests_per_engagement=10, engagements_per_product=50, products_per_product_type=15, *, include_very_big_scans=False, **kwargs):
logger.info(f"product_name_prefix: {product_name_prefix}, tests_per_engagement: {tests_per_engagement}, engagements_per_product: {engagements_per_product}, products_per_product_type: {products_per_product_type}")
product_type_prefix = "Sample scans " + datetime.now().strftime("%Y-%m-%d %H:%M:%S")
product_type_index = 1
product_index = 1
engagement_index = 1
tests_index = 1
error_count = 0
error_messages = {}
# iterate through the modules in the current package
package_dir = str(Path(dojo.tools.factory.__file__).resolve().parent)
for module_name in os.listdir(package_dir): # noqa: PTH208
if tests_index > tests_per_engagement:
tests_index = 1
engagement_index += 1
if engagement_index > engagements_per_product:
engagement_index = 1
product_index += 1
if product_index > products_per_product_type:
product_index = 1
product_type_index += 1
prod_type, _ = Product_Type.objects.get_or_create(name=product_type_prefix + f" {product_type_index}")
prod, _ = Product.objects.get_or_create(prod_type=prod_type, name=product_name_prefix + f" {product_type_index}:{product_index}", description="Sample scans for unittesting")
eng, _ = Engagement.objects.get_or_create(product=prod, name="Sample scan engagement" + f" {engagement_index}", target_start=timezone.now(), target_end=timezone.now())
# check if it's dir
if (Path(package_dir) / module_name).is_dir():
try:
# check if it's a Python module
if find_spec(f"dojo.tools.{module_name}.parser"):
# import the module and iterate through its attributes
module = import_module(f"dojo.tools.{module_name}.parser")
for attribute_name in dir(module):
attribute = getattr(module, attribute_name)
if isclass(attribute) and attribute_name.lower() == module_name.replace("_", "") + "parser":
logger.debug(f"Loading {module_name} parser")
scan_dir = Path("unittests") / "scans" / module_name
for scan_file in scan_dir.glob("*.json"):
if include_very_big_scans or scan_file.name != "very_many_vulns.json": # jfrog_xray file is huge and takes too long to import
try:
logger.info(f"Importing scan {scan_file.name} using {module_name} parser into {prod.name}:{eng.name}")
parser = attribute()
# with scan_file.open(encoding="utf-8") as f:
# findings = parser.get_findings(f, Test())
result = self.import_scan_with_params(
filename=module_name + "/" + scan_file.name,
scan_type=parser.get_scan_types()[0],
engagement=eng.id,
)
# logger.debug(f"Result of import: {result}")
# raise Exception(f"Scan {scan_file.name} is not expected to be imported, but it was.")
logger.debug(f"Imported findings from {module_name + scan_file.name}")
tests_index += 1
except Exception as e:
logger.error(f"Error importing scan {module_name + scan_file.name}: {e}")
error_count += 1
error_messages[module_name + "/" + scan_file.name] = result.get("message", str(e))
except:
logger.exception(f"failed to load {module_name}")
raise
logger.error(f"Error count: {error_count}")
for scan, message in error_messages.items():
logger.error(f"Error importing scan {scan}: {message}")
def handle(self, *args, **options):
logger.info("EXPERIMENTAL: This command may be changed/deprecated/removed without prior notice.")
for i in range(options.get("number_of_runs", 1)):
product_name_prefix = options.get("product_name_prefix")
if not product_name_prefix:
today = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
product_name_prefix = f"Sample scan product {i + 1} {today}"
self.import_all_unittest_scans(
product_name_prefix=product_name_prefix,
tests_per_engagement=options.get("tests_per_engagement"),
engagements_per_product=options.get("engagements_per_product"),
products_per_product_type=options.get("products_per_product_type"),
include_very_big_scans=options.get("include_very_big_scans"),
)


All finding details can be found in the DryRun Security Dashboard.

@valentijnscholten valentijnscholten added this to the 2.48.0 milestone Jun 26, 2025

@mtesauro mtesauro left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved

@Maffooch Maffooch requested review from dogboat and hblankenship July 2, 2025 15:39
@valentijnscholten valentijnscholten merged commit ffffe7d into DefectDojo:bugfix Jul 3, 2025
78 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants