diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000000..64279977619 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,249 @@ +# DefectDojo Development Guide + +## Project Overview + +DefectDojo is a Django application (`dojo` app) for vulnerability management. The codebase is undergoing a modular reorganization to move from monolithic files toward self-contained domain modules. + +## Module Reorganization + +### Reference Pattern: `dojo/url/` + +All domain modules should match the structure of `dojo/url/`. This is the canonical example of a fully reorganized module. + +``` +dojo/{module}/ +├── __init__.py # import dojo.{module}.admin # noqa: F401 +├── models.py # Domain models, constants, factory methods +├── admin.py # @admin.register() for the module's models +├── services.py # Business logic (no HTTP concerns) +├── queries.py # Complex DB aggregations/annotations +├── signals.py # Django signal handlers +├── [manager.py] # Custom QuerySet/Manager if needed +├── [validators.py] # Field-level validators if needed +├── [helpers.py] # Async task wrappers, tag propagation, etc. +├── ui/ +│ ├── __init__.py # Empty +│ ├── forms.py # Django ModelForms +│ ├── filters.py # UI-layer django-filter classes +│ ├── views.py # Thin view functions — delegates to services.py +│ └── urls.py # URL routing +└── api/ + ├── __init__.py # path = "{module}" + ├── serializer.py # DRF serializers + ├── views.py # API ViewSets — delegates to services.py + ├── filters.py # API-layer filters + └── urls.py # add_{module}_urls(router) registration +``` + +### Architecture Principles + + +**services.py is the critical layer**: Both `ui/views.py` and `api/views.py` call `services.py` for business logic. Services accept domain objects and primitives — never request/response objects, forms, or serializers. + +**Backward-compatible re-exports**: When moving code out of monolithic files (`dojo/models.py`, `dojo/forms.py`, `dojo/filters.py`, `dojo/api_v2/serializers.py`, `dojo/api_v2/views.py`), always leave a re-export at the original location: +```python +from dojo.{module}.models import {Model} # noqa: F401 -- backward compat +``` +Never remove re-exports until all consumers are updated in a dedicated cleanup pass. + +### Current State + +Modules in various stages of reorganization: + +| Module | models.py | services.py | ui/ | api/ | Status | +|--------|-----------|-------------|-----|------|--------| +| **url** | In module | N/A | Done | Done | **Complete** | +| **location** | In module | N/A | N/A | Done | **Complete** | +| **product_type** | In dojo/models.py | Missing | Partial (views at root) | In dojo/api_v2/ | Needs work | +| **test** | In dojo/models.py | Missing | Partial (views at root) | In dojo/api_v2/ | Needs work | +| **engagement** | In dojo/models.py | Partial (32 lines) | Partial (views at root) | In dojo/api_v2/ | Needs work | +| **product** | In dojo/models.py | Missing | Partial (views at root) | In dojo/api_v2/ | Needs work | +| **finding** | In dojo/models.py | Missing | Partial (views at root) | In dojo/api_v2/ | Needs work | + +### Monolithic Files Being Decomposed + +These files still contain code for multiple modules. Extract code to the target module's subdirectory and leave a re-export stub. + +- `dojo/models.py` (4,973 lines) — All model definitions +- `dojo/forms.py` (4,127 lines) — All Django forms +- `dojo/filters.py` (4,016 lines) — All UI and API filter classes +- `dojo/api_v2/serializers.py` (3,387 lines) — All DRF serializers +- `dojo/api_v2/views.py` (3,519 lines) — All API viewsets + +--- + +## Reorganization Playbook + +When asked to reorganize a module, follow these phases in order. Each phase should be independently verifiable. + +### Phase 0: Pre-Flight (Read-Only) + +Before any changes, identify all code to extract: + +```bash +# 1. Model classes and line ranges in dojo/models.py +grep -n "class {Model}" dojo/models.py + +# 2. Form classes in dojo/forms.py +grep -n "class.*{Module}" dojo/forms.py +grep -n "model = {Model}" dojo/forms.py + +# 3. Filter classes in dojo/filters.py +grep -n "class.*{Module}\|class.*{Model}" dojo/filters.py + +# 4. Serializer classes +grep -n "class.*{Model}" dojo/api_v2/serializers.py + +# 5. ViewSet classes +grep -n "class.*{Model}\|class.*{Module}" dojo/api_v2/views.py + +# 6. Admin registrations +grep -n "admin.site.register({Model}" dojo/models.py + +# 7. All import sites (to verify backward compat) +grep -rn "from dojo.models import.*{Model}" dojo/ unittests/ + +# 8. Business logic in current views +# Scan dojo/{module}/views.py for: .save(), .delete(), create_notification(), +# jira_helper.*, dojo_dispatch_task(), multi-model workflows +``` + +### Phase 1: Extract Models + +1. Create `dojo/{module}/models.py` with the model class(es) and associated constants +2. Create `dojo/{module}/admin.py` with `admin.site.register()` calls (remove from `dojo/models.py`) +3. Update `dojo/{module}/__init__.py` to `import dojo.{module}.admin # noqa: F401` +4. Add re-exports in `dojo/models.py` +5. Remove original model code (keep re-export line) + +**Import rules for models.py:** +- Upward FKs (e.g., Test -> Engagement): import from `dojo.models` if not yet extracted, or `dojo.{module}.models` if already extracted +- Downward references (e.g., Product_Type querying Finding): use lazy imports inside method bodies +- Shared utilities (`copy_model_util`, `_manage_inherited_tags`, `get_current_date`, etc.): import from `dojo.models` +- Do NOT set `app_label` in Meta — all models inherit `dojo` app_label automatically + +**Verify:** +```bash +python manage.py check +python manage.py makemigrations --check +python -c "from dojo.{module}.models import {Model}" +python -c "from dojo.models import {Model}" +``` + +### Phase 2: Extract Services + +Create `dojo/{module}/services.py` with business logic extracted from UI views. + +**What belongs in services.py:** +- State transitions (close, reopen, status changes) +- Multi-step creation/update workflows +- External integration calls (JIRA, GitHub) +- Notification dispatching +- Copy/clone operations +- Bulk operations +- Merge operations + +**What stays in views:** +- HTTP request/response handling +- Form instantiation and validation +- Serialization/deserialization +- Authorization checks (`@user_is_authorized`, `user_has_permission_or_403`) +- Template rendering, redirects +- Pagination, breadcrumbs + +**Service function pattern:** +```python +def close_engagement(engagement: Engagement, user: User) -> Engagement: + engagement.active = False + engagement.status = "Completed" + engagement.save() + if jira_helper.get_jira_project(engagement): + dojo_dispatch_task(jira_helper.close_epic, engagement.id, push_to_jira=True) + return engagement +``` + +Update UI views and API viewsets to call the service instead of containing logic inline. + +### Phase 3: Extract Forms to `ui/forms.py` + +1. Create `dojo/{module}/ui/__init__.py` (empty) +2. Create `dojo/{module}/ui/forms.py` — move form classes from `dojo/forms.py` +3. Add re-exports in `dojo/forms.py` + +### Phase 4: Extract UI Filters to `ui/filters.py` + +1. Create `dojo/{module}/ui/filters.py` — move module-specific filters from `dojo/filters.py` +2. Shared base classes (`DojoFilter`, `DateRangeFilter`, `ReportBooleanFilter`) stay in `dojo/filters.py` +3. Add re-exports in `dojo/filters.py` + +### Phase 5: Move UI Views/URLs into `ui/` + +1. Move `dojo/{module}/views.py` -> `dojo/{module}/ui/views.py` +2. Move `dojo/{module}/urls.py` -> `dojo/{module}/ui/urls.py` +3. Update URL imports: + - product: update `dojo/asset/urls.py` + - product_type: update `dojo/organization/urls.py` + - others: update the include in `dojo/urls.py` + +### Phase 6: Extract API Serializers to `api/serializer.py` + +1. Create `dojo/{module}/api/__init__.py` with `path = "{module}"` +2. Create `dojo/{module}/api/serializer.py` — move from `dojo/api_v2/serializers.py` +3. Add re-exports in `dojo/api_v2/serializers.py` + +### Phase 7: Extract API Filters to `api/filters.py` + +1. Create `dojo/{module}/api/filters.py` — move `Api{Model}Filter` from `dojo/filters.py` +2. Add re-exports + +### Phase 8: Extract API ViewSets to `api/views.py` + +1. Create `dojo/{module}/api/views.py` — move from `dojo/api_v2/views.py` +2. Add re-exports in `dojo/api_v2/views.py` + +### Phase 9: Extract API URL Registration + +1. Create `dojo/{module}/api/urls.py`: + ```python + from dojo.{module}.api import path + from dojo.{module}.api.views import {ViewSet} + + def add_{module}_urls(router): + router.register(path, {ViewSet}, path) + return router + ``` +2. Update `dojo/urls.py` — replace `v2_api.register(...)` with `add_{module}_urls(v2_api)` + +### After Each Phase: Verify + +```bash +python manage.py check +python manage.py makemigrations --check +python -m pytest unittests/ -x --timeout=120 +``` + +--- + +## Cross-Module Dependencies + +The model hierarchy is: Product_Type -> Product -> Engagement -> Test -> Finding + +Extract in this order (top to bottom) so that upward FKs can import from already-extracted modules. The recommended order is: product_type, test, engagement, product, finding. + +For downward references (e.g., Product_Type's cached properties querying Finding), always use lazy imports: +```python +@cached_property +def critical_present(self): + from dojo.models import Finding # lazy import + return Finding.objects.filter(test__engagement__product__prod_type=self, severity="Critical").exists() +``` + +--- + +## Key Technical Details + +- **Single Django app**: Everything is under `app_label = "dojo"`. Moving models to subdirectories does NOT require migration changes. +- **Model discovery**: Triggered by `__init__.py` importing `admin.py`, which imports `models.py`. This is the same chain `dojo/url/` uses. +- **Signal registration**: Handled in `dojo/apps.py` via `import dojo.{module}.signals`. Already set up for test, engagement, product, product_type. +- **Watson search**: Uses `self.get_model("Product")` in `apps.py` — works via Django's model registry regardless of file location. +- **Admin registration**: Currently at the bottom of `dojo/models.py` (lines 4888-4973). Must be moved to `{module}/admin.py` and removed from `dojo/models.py` to avoid `AlreadyRegistered` errors. diff --git a/components/package.json b/components/package.json index f224ee344bd..72aabbff418 100644 --- a/components/package.json +++ b/components/package.json @@ -1,6 +1,6 @@ { "name": "defectdojo", - "version": "2.57.1", + "version": "2.57.2", "license" : "BSD-3-Clause", "private": true, "dependencies": { diff --git a/dojo/__init__.py b/dojo/__init__.py index 826418a495f..7fda45b24f0 100644 --- a/dojo/__init__.py +++ b/dojo/__init__.py @@ -4,6 +4,6 @@ # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa: F401 -__version__ = "2.57.1" +__version__ = "2.57.2" __url__ = "https://github.com/DefectDojo/django-DefectDojo" __docs__ = "https://documentation.defectdojo.com" diff --git a/dojo/announcement/os_message.py b/dojo/announcement/os_message.py new file mode 100644 index 00000000000..dfdb9288710 --- /dev/null +++ b/dojo/announcement/os_message.py @@ -0,0 +1,119 @@ +import logging + +import bleach +import markdown +import requests +from django.core.cache import cache + +logger = logging.getLogger(__name__) + +BUCKET_URL = "https://storage.googleapis.com/defectdojo-os-messages-prod/open_source_message.md" +CACHE_SECONDS = 3600 +HTTP_TIMEOUT_SECONDS = 2 +CACHE_KEY = "os_message:v1" + +INLINE_TAGS = ["strong", "em", "a"] +INLINE_ATTRS = {"a": ["href", "title"]} + +# Keep BLOCK_TAGS / BLOCK_ATTRS in sync with the DaaS publisher's +# MARKDOWNIFY["default"]["WHITELIST_TAGS"] / WHITELIST_ATTRS so previews +# on DaaS and rendering in OSS stay byte-identical. +BLOCK_TAGS = [ + "p", "ul", "ol", "li", "a", "strong", "em", "code", "pre", + "blockquote", "h2", "h3", "h4", "hr", "br", "b", "i", + "abbr", "acronym", +] +BLOCK_ATTRS = { + "a": ["href", "title"], + "abbr": ["title"], + "acronym": ["title"], +} + +_MISS = object() + + +def fetch_os_message(): + cached = cache.get(CACHE_KEY, default=_MISS) + if cached is not _MISS: + return cached + + try: + response = requests.get(BUCKET_URL, timeout=HTTP_TIMEOUT_SECONDS) + except Exception: + logger.debug("os_message: fetch failed", exc_info=True) + cache.set(CACHE_KEY, None, CACHE_SECONDS) + return None + + if response.status_code != 200 or not response.text.strip(): + cache.set(CACHE_KEY, None, CACHE_SECONDS) + return None + + cache.set(CACHE_KEY, response.text, CACHE_SECONDS) + return response.text + + +def _strip_outer_p(html): + stripped = html.strip() + if stripped.startswith("

") and stripped.endswith("

"): + return stripped[3:-4] + return stripped + + +def parse_os_message(text): + lines = text.splitlines() + + headline_source = None + body_start = None + for index, line in enumerate(lines): + if line.startswith("# "): + headline_source = line[2:].strip() + body_start = index + 1 + break + + if not headline_source: + return None + + headline_source = headline_source[:100] + headline_rendered = markdown.markdown(headline_source) + headline_cleaned = bleach.clean( + headline_rendered, + tags=INLINE_TAGS, + attributes=INLINE_ATTRS, + strip=True, + ) + headline_html = _strip_outer_p(headline_cleaned) + + expanded_html = None + expanded_marker = "## Expanded Message" + expanded_body_lines = None + for offset, line in enumerate(lines[body_start:], start=body_start): + if line.strip() == expanded_marker: + expanded_body_lines = lines[offset + 1:] + break + + if expanded_body_lines is not None: + expanded_source = "\n".join(expanded_body_lines).strip() + if expanded_source: + expanded_rendered = markdown.markdown( + expanded_source, + extensions=["extra", "fenced_code", "nl2br"], + ) + expanded_html = bleach.clean( + expanded_rendered, + tags=BLOCK_TAGS, + attributes=BLOCK_ATTRS, + strip=True, + ) + + return {"message": headline_html, "expanded_html": expanded_html} + + +def get_os_banner(): + try: + text = fetch_os_message() + if not text: + return None + return parse_os_message(text) + except Exception: + logger.debug("os_message: get_os_banner failed", exc_info=True) + return None diff --git a/dojo/announcement/signals.py b/dojo/announcement/signals.py index dedd3444654..c74fd0e5d50 100644 --- a/dojo/announcement/signals.py +++ b/dojo/announcement/signals.py @@ -1,4 +1,3 @@ -from django.conf import settings from django.db.models.signals import post_save from django.dispatch import receiver @@ -7,22 +6,11 @@ @receiver(post_save, sender=Dojo_User) def add_announcement_to_new_user(sender, instance, **kwargs): - announcements = Announcement.objects.all() - if announcements.count() > 0: - dojo_user = Dojo_User.objects.get(id=instance.id) - announcement = announcements.first() - cloud_announcement = ( - "DefectDojo Pro Cloud and On-Premise Subscriptions Now Available!" - in announcement.message + announcement = Announcement.objects.first() + if announcement is not None: + UserAnnouncement.objects.get_or_create( + user=instance, announcement=announcement, ) - if not cloud_announcement or settings.CREATE_CLOUD_BANNER: - user_announcements = UserAnnouncement.objects.filter( - user=dojo_user, announcement=announcement, - ) - if user_announcements.count() == 0: - UserAnnouncement.objects.get_or_create( - user=dojo_user, announcement=announcement, - ) @receiver(post_save, sender=Announcement) diff --git a/dojo/api_v2/permissions.py b/dojo/api_v2/permissions.py index 807bbcf7b2a..10e012ede5b 100644 --- a/dojo/api_v2/permissions.py +++ b/dojo/api_v2/permissions.py @@ -473,7 +473,13 @@ def has_permission(self, request, view): # Raise an explicit drf exception here raise ValidationError(e) if engagement := converted_dict.get("engagement"): - # existing engagement, nothing special to check + # Validate the resolved engagement's parent chain matches any provided identifiers + if (product := converted_dict.get("product")) and engagement.product_id != product.id: + msg = "The provided identifiers are inconsistent — the engagement does not belong to the specified product." + raise ValidationError(msg) + if (engagement_name := converted_dict.get("engagement_name")) and engagement.name != engagement_name: + msg = "The provided identifiers are inconsistent — the engagement name does not match the specified engagement." + raise ValidationError(msg) return user_has_permission( request.user, engagement, Permissions.Import_Scan_Result, ) @@ -764,6 +770,11 @@ def has_permission(self, request, view): try: converted_dict = auto_create.convert_querydict_to_dict(request.data) auto_create.process_import_meta_data_from_dict(converted_dict) + # engagement is not a declared field on ReImportScanSerializer and will be + # stripped during validation — don't use it in the permission check either, + # so the permission check resolves targets the same way execution does + converted_dict.pop("engagement", None) + converted_dict.pop("engagement_id", None) # Get an existing product converted_dict["product_type"] = auto_create.get_target_product_type_if_exists(**converted_dict) converted_dict["product"] = auto_create.get_target_product_if_exists(**converted_dict) @@ -774,7 +785,20 @@ def has_permission(self, request, view): raise ValidationError(e) if test := converted_dict.get("test"): - # existing test, nothing special to check + # Validate the resolved test's parent chain matches any provided identifiers + if (product := converted_dict.get("product")) and test.engagement.product_id != product.id: + msg = "The provided identifiers are inconsistent — the test does not belong to the specified product." + raise ValidationError(msg) + if (engagement := converted_dict.get("engagement")) and test.engagement_id != engagement.id: + msg = "The provided identifiers are inconsistent — the test does not belong to the specified engagement." + raise ValidationError(msg) + # Also validate by name when the objects were not resolved (e.g. names that match no existing record) + if not converted_dict.get("product") and (product_name := converted_dict.get("product_name")) and test.engagement.product.name != product_name: + msg = "The provided identifiers are inconsistent — the test does not belong to the specified product." + raise ValidationError(msg) + if not converted_dict.get("engagement") and (engagement_name := converted_dict.get("engagement_name")) and test.engagement.name != engagement_name: + msg = "The provided identifiers are inconsistent — the test does not belong to the specified engagement." + raise ValidationError(msg) return user_has_permission( request.user, test, Permissions.Import_Scan_Result, ) @@ -1181,7 +1205,10 @@ def check_auto_create_permission( raise ValidationError(msg) if engagement: - # existing engagement, nothing special to check + # Validate the resolved engagement's parent chain matches any provided names + if product is not None and engagement.product_id != product.id: + msg = "The provided identifiers are inconsistent — the engagement does not belong to the specified product." + raise ValidationError(msg) return user_has_permission( user, engagement, Permissions.Import_Scan_Result, ) diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 342287aba94..7c9bbd6ae79 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -1123,6 +1123,18 @@ def validate(self, data): if data.get("target_start") > data.get("target_end"): msg = "Your target start date exceeds your target end date" raise serializers.ValidationError(msg) + if ( + self.instance is not None + and "product" in data + and data.get("product") != self.instance.product + and not user_has_permission( + self.context["request"].user, + data.get("product"), + Permissions.Engagement_Edit, + ) + ): + msg = "You are not permitted to edit engagements in the destination product" + raise PermissionDenied(msg) return data def build_relational_field(self, field_name, relation_info): diff --git a/dojo/context_processors.py b/dojo/context_processors.py index cc53af0f1e0..792e1eb6b42 100644 --- a/dojo/context_processors.py +++ b/dojo/context_processors.py @@ -5,13 +5,14 @@ from django.conf import settings from django.contrib import messages +from dojo.announcement.os_message import get_os_banner from dojo.labels import get_labels from dojo.models import Alerts, System_Settings, UserAnnouncement def globalize_vars(request): # return the value you want as a dictionnary. you may add multiple values in there. - return { + context = { "SHOW_LOGIN_FORM": settings.SHOW_LOGIN_FORM, "FORGOT_PASSWORD": settings.FORGOT_PASSWORD, "FORGOT_USERNAME": settings.FORGOT_USERNAME, @@ -35,11 +36,32 @@ def globalize_vars(request): "DOCUMENTATION_URL": settings.DOCUMENTATION_URL, "API_TOKENS_ENABLED": settings.API_TOKENS_ENABLED, "API_TOKEN_AUTH_ENDPOINT_ENABLED": settings.API_TOKEN_AUTH_ENDPOINT_ENABLED, - "CREATE_CLOUD_BANNER": settings.CREATE_CLOUD_BANNER, + "SHOW_PLG_LINK": True, # V3 Feature Flags "V3_FEATURE_LOCATIONS": settings.V3_FEATURE_LOCATIONS, } + additional_banners = [] + + if (os_banner := get_os_banner()) is not None: + additional_banners.append({ + "source": "os", + "message": os_banner["message"], + "style": "info", + "url": "", + "link_text": "", + "expanded_html": os_banner["expanded_html"], + }) + + if hasattr(request, "session"): + for banner in request.session.pop("_product_banners", []): + additional_banners.append(banner) + + if additional_banners: + context["additional_banners"] = additional_banners + + return context + def bind_system_settings(request): """Load system settings and display warning if there's a database error.""" diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index 1b94ace21f4..ca36d81c0dd 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -286,6 +286,12 @@ def edit_engagement(request, eid): if form.is_valid(): # first save engagement details new_status = form.cleaned_data.get("status") + if form.cleaned_data.get("product") != engagement.product: + user_has_permission_or_403( + request.user, + form.cleaned_data.get("product"), + Permissions.Engagement_Edit, + ) engagement.product = form.cleaned_data.get("product") engagement = form.save(commit=False) if (new_status in {"Cancelled", "Completed"}): @@ -1442,9 +1448,10 @@ def view_edit_risk_acceptance(request, eid, raid, *, edit_mode=False): "Since you are not the note's author, it was not deleted.", extra_tags="alert-danger") - if "remove_finding" in request.POST: + if edit_mode and "remove_finding" in request.POST: finding = get_object_or_404( - Finding, pk=request.POST["remove_finding_id"]) + risk_acceptance.accepted_findings, + pk=request.POST["remove_finding_id"]) ra_helper.remove_finding_from_risk_acceptance(request.user, risk_acceptance, finding) diff --git a/dojo/importers/auto_create_context.py b/dojo/importers/auto_create_context.py index 26d37ae65b0..916bbea056e 100644 --- a/dojo/importers/auto_create_context.py +++ b/dojo/importers/auto_create_context.py @@ -181,6 +181,9 @@ def get_target_engagement_if_exists( """ if engagement := get_object_or_none(Engagement, pk=engagement_id): logger.debug("Using existing engagement by id: %s", engagement_id) + if product is not None and engagement.product_id != product.id: + msg = "The provided identifiers are inconsistent — the engagement does not belong to the specified product." + raise ValueError(msg) return engagement # if there's no product, then for sure there's no engagement either if product is None: @@ -203,6 +206,9 @@ def get_target_test_if_exists( """ if test := get_object_or_none(Test, pk=test_id): logger.debug("Using existing Test by id: %s", test_id) + if engagement is not None and test.engagement_id != engagement.id: + msg = "The provided identifiers are inconsistent — the test does not belong to the specified engagement." + raise ValueError(msg) return test # If the engagement is not supplied, we cannot do anything if not engagement: diff --git a/dojo/management/commands/complete_initialization.py b/dojo/management/commands/complete_initialization.py index 556c77867fb..9ae58926324 100644 --- a/dojo/management/commands/complete_initialization.py +++ b/dojo/management/commands/complete_initialization.py @@ -14,7 +14,6 @@ from django.db.utils import ProgrammingError from dojo.auditlog import configure_pghistory_triggers -from dojo.models import Announcement, Dojo_User, UserAnnouncement class Command(BaseCommand): @@ -38,13 +37,11 @@ def handle(self, *args: Any, **options: Any) -> None: if self.admin_user_exists(): self.stdout.write("Admin user already exists; skipping first-boot setup") - self.create_announcement_banner() self.initialize_data() return self.ensure_admin_secrets() self.first_boot_setup() - self.create_announcement_banner() self.initialize_data() # ------------------------------------------------------------------ @@ -58,29 +55,6 @@ def initialize_data(self) -> None: self.stdout.write("Initializing non-standard permissions") call_command("initialize_permissions") - def create_announcement_banner(self) -> None: - if os.getenv("DD_CREATE_CLOUD_BANNER"): - return - - self.stdout.write("Creating announcement banner") - - announcement, _ = Announcement.objects.get_or_create(id=1) - announcement.message = ( - '' - "DefectDojo Pro Cloud and On-Premise Subscriptions Now Available! " - "Create an account to try Pro for free!" - "" - ) - announcement.dismissable = True - announcement.save() - - for user in Dojo_User.objects.all(): - UserAnnouncement.objects.get_or_create( - user=user, - announcement=announcement, - ) - # ------------------------------------------------------------------ # Auditlog consistency # ------------------------------------------------------------------ diff --git a/dojo/product_announcements.py b/dojo/product_announcements.py index 8510b42a0f8..90280885007 100644 --- a/dojo/product_announcements.py +++ b/dojo/product_announcements.py @@ -1,8 +1,6 @@ import logging -from django.conf import settings -from django.contrib import messages from django.http import HttpRequest, HttpResponse from django.utils.safestring import mark_safe from django.utils.translation import gettext_lazy as _ @@ -30,12 +28,8 @@ def __init__( response_data: dict | None = None, **kwargs: dict, ): - """Skip all this if the CREATE_CLOUD_BANNER is not set""" - if not settings.CREATE_CLOUD_BANNER: - return - # Fill in the vars if the were supplied correctly if request is not None and isinstance(request, HttpRequest): - self._add_django_message( + self._add_session_banner( request=request, message=mark_safe(f"{self.base_message} {self.ui_outreach}"), ) @@ -51,18 +45,21 @@ def __init__( msg = "At least one of request, response, or response_data must be supplied" raise ValueError(msg) - def _add_django_message(self, request: HttpRequest, message: str): - """Add a message to the UI""" + def _add_session_banner(self, request: HttpRequest, message: str): + """Store a banner in the session for rendering via additional_banners.""" try: - messages.add_message( - request=request, - level=messages.INFO, - message=_(message), - extra_tags="alert-info", - ) + banners = request.session.get("_product_banners", []) + banners.append({ + "source": "product_announcement", + "message": str(_(message)), + "style": "info", + "url": "", + "link_text": "", + "expanded_html": None, + }) + request.session["_product_banners"] = banners except Exception: - # make sure we catch any exceptions that might happen: https://github.com/DefectDojo/django-DefectDojo/issues/14041 - logger.exception(f"Error adding message to Django: {message}") + logger.exception(f"Error storing product announcement banner: {message}") def _add_api_response_key(self, message: str, data: dict) -> dict: """Update the response data in place""" diff --git a/dojo/risk_acceptance/api.py b/dojo/risk_acceptance/api.py index 2fdaadf0afb..4db4e09c76d 100644 --- a/dojo/risk_acceptance/api.py +++ b/dojo/risk_acceptance/api.py @@ -1,18 +1,20 @@ from abc import ABC, abstractmethod from typing import NamedTuple +from django.core.exceptions import PermissionDenied from django.db.models import QuerySet from django.utils import timezone from drf_spectacular.utils import extend_schema from rest_framework import serializers, status from rest_framework.decorators import action -from rest_framework.permissions import IsAdminUser +from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response +from dojo.api_v2.permissions import UserHasRiskAcceptanceRelatedObjectPermission from dojo.api_v2.serializers import RiskAcceptanceSerializer from dojo.authorization.roles_permissions import Permissions from dojo.engagement.queries import get_authorized_engagements -from dojo.models import Risk_Acceptance, User, Vulnerability_Id +from dojo.models import Engagement, Risk_Acceptance, User, Vulnerability_Id AcceptedRisk = NamedTuple("AcceptedRisk", (("vulnerability_id", str), ("justification", str), ("accepted_by", str))) @@ -40,10 +42,13 @@ def risk_application_model_class(self): request=AcceptedRiskSerializer(many=True), responses={status.HTTP_201_CREATED: RiskAcceptanceSerializer(many=True)}, ) - @action(methods=["post"], detail=True, permission_classes=[IsAdminUser], serializer_class=AcceptedRiskSerializer, - filter_backends=[], pagination_class=None) + @action(methods=["post"], detail=True, permission_classes=[IsAuthenticated, UserHasRiskAcceptanceRelatedObjectPermission], + serializer_class=AcceptedRiskSerializer, filter_backends=[], pagination_class=None) def accept_risks(self, request, pk=None): model = self.get_object() + product = model.product if isinstance(model, Engagement) else model.engagement.product + if not product.enable_full_risk_acceptance: + raise PermissionDenied serializer = AcceptedRiskSerializer(data=request.data, many=True) if serializer.is_valid(): accepted_risks = serializer.save() @@ -63,7 +68,7 @@ class AcceptedFindingsMixin(ABC): request=AcceptedRiskSerializer(many=True), responses={status.HTTP_201_CREATED: RiskAcceptanceSerializer(many=True)}, ) - @action(methods=["post"], detail=False, permission_classes=[IsAdminUser], serializer_class=AcceptedRiskSerializer) + @action(methods=["post"], detail=False, permission_classes=[IsAuthenticated], serializer_class=AcceptedRiskSerializer) def accept_risks(self, request): serializer = AcceptedRiskSerializer(data=request.data, many=True) if serializer.is_valid(): @@ -72,7 +77,9 @@ def accept_risks(self, request): return Response(data=serializer.errors, status=status.HTTP_400_BAD_REQUEST) owner = request.user accepted_result = [] - for engagement in get_authorized_engagements(Permissions.Engagement_View): + for engagement in get_authorized_engagements(Permissions.Risk_Acceptance): + if not engagement.product.enable_full_risk_acceptance: + continue base_findings = engagement.unaccepted_open_findings accepted = _accept_risks(accepted_risks, base_findings, owner) engagement.accept_risks(accepted) diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index 429b5646b90..b922005ff0a 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -356,8 +356,6 @@ DD_HASHCODE_FIELDS_PER_SCANNER=(str, ""), # Set deduplication algorithms per parser, via en env variable that contains a JSON string DD_DEDUPLICATION_ALGORITHM_PER_PARSER=(str, ""), - # Dictates whether cloud banner is created or not - DD_CREATE_CLOUD_BANNER=(bool, True), # With this setting turned on, Dojo maintains an audit log of changes made to entities (Findings, Tests, Engagements, Products, ...) # If you run big import you may want to disable this because there's a performance hit during (re-)imports. DD_ENABLE_AUDITLOG=(bool, True), @@ -1339,13 +1337,6 @@ def saml2_attrib_map_format(din): "expires": int(60 * 1 * 1.2), # If a task is not executed within 72 seconds, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. }, }, - "trigger_evaluate_pro_proposition": { - "task": "dojo.tasks.evaluate_pro_proposition", - "schedule": timedelta(hours=8), - "options": { - "expires": int(60 * 60 * 8 * 1.2), # If a task is not executed within 9.6 hours, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. - }, - }, "clear_sessions": { "task": "dojo.tasks.clear_sessions", "schedule": crontab(hour=0, minute=0, day_of_week=0), @@ -2036,6 +2027,7 @@ def saml2_attrib_map_format(din): "KHV": "https://avd.aquasec.com/misconfig/kubernetes/", # e.g. https://avd.aquasec.com/misconfig/kubernetes/khv045 "LEN-": "https://support.lenovo.com/cl/de/product_security/", # e.g. https://support.lenovo.com/cl/de/product_security/LEN-94953 "MAL-": "https://cvepremium.circl.lu/vuln/", # e.g. https://cvepremium.circl.lu/vuln/mal-2025-49305 + "MFSA": "https://www.mozilla.org/en-US/security/advisories/", # e.g. https://www.mozilla.org/en-US/security/advisories/mfsa2025-01/ "MGAA-": "https://advisories.mageia.org/&&.html", # e.g. https://advisories.mageia.org/MGAA-2013-0054.html "MGASA-": "https://advisories.mageia.org/&&.html", # e.g. https://advisories.mageia.org/MGASA-2025-0023.html "MSRC_": "https://cvepremium.circl.lu/vuln/", # e.g. https://cvepremium.circl.lu/vuln/msrc_cve-2025-59200 @@ -2082,9 +2074,6 @@ def saml2_attrib_map_format(din): AUDITLOG_DISABLE_ON_RAW_SAVE = False # You can set extra Jira headers by suppling a dictionary in header: value format (pass as env var like "headr_name=value,another_header=anohter_value") ADDITIONAL_HEADERS = env("DD_ADDITIONAL_HEADERS") -# Dictates whether cloud banner is created or not -CREATE_CLOUD_BANNER = env("DD_CREATE_CLOUD_BANNER") - # ------------------------------------------------------------------------------ # Auditlog # ------------------------------------------------------------------------------ diff --git a/dojo/static/dojo/css/dojo.css b/dojo/static/dojo/css/dojo.css index deba72474c9..fea91da6c94 100644 --- a/dojo/static/dojo/css/dojo.css +++ b/dojo/static/dojo/css/dojo.css @@ -1124,6 +1124,41 @@ div.custom-search-form { .announcement-banner { margin: 0px -15px; border-radius: 0px 0px 4px 4px; + color: #000; +} + +.announcement-banner a { + color: #0645ad; + text-decoration: underline; +} + +.announcement-banner strong, +.announcement-banner b { + color: #222; +} + +.banner-toggle { + background: transparent; + border: 0; + padding: 0; + margin-left: 6px; + color: inherit; + cursor: pointer; + line-height: 1; +} + +.banner-toggle:focus, +.banner-toggle:active { + outline: none; + box-shadow: none; +} + +.banner-toggle:not(.collapsed) .fa-caret-down { + transform: rotate(180deg); +} + +.banner-expanded { + margin-top: 8px; } @media (min-width: 795px) { diff --git a/dojo/tasks.py b/dojo/tasks.py index 5a494072a8b..d1f90275dc2 100644 --- a/dojo/tasks.py +++ b/dojo/tasks.py @@ -16,9 +16,8 @@ from dojo.celery import app from dojo.celery_dispatch import dojo_dispatch_task from dojo.finding.helper import fix_loop_duplicates -from dojo.location.models import Location from dojo.management.commands.jira_status_reconciliation import jira_status_reconciliation -from dojo.models import Alerts, Announcement, Endpoint, Engagement, Finding, Product, System_Settings, User +from dojo.models import Alerts, Engagement, Finding, Product, System_Settings, User from dojo.notifications.helper import create_notification from dojo.utils import calculate_grade, sla_compute_and_notify @@ -218,37 +217,6 @@ def fix_loop_duplicates_task(*args, **kwargs): return fix_loop_duplicates() -@app.task -def evaluate_pro_proposition(*args, **kwargs): - # Ensure we should be doing this - if not settings.CREATE_CLOUD_BANNER: - return - # Get the announcement object - announcement = Announcement.objects.get_or_create(id=1)[0] - # Quick check for a user has modified the current banner - if not, exit early as we dont want to stomp - if not any( - entry in announcement.message - for entry in [ - "", - "DefectDojo Pro Cloud and On-Premise Subscriptions Now Available!", - "Findings/Endpoints in their systems", - ] - ): - return - # Count the objects the determine if the banner should be updated - if settings.V3_FEATURE_LOCATIONS: - object_count = Finding.objects.count() + Location.objects.count() - else: - # TODO: Delete this after the move to Locations - object_count = Finding.objects.count() + Endpoint.objects.count() - # Unless the count is greater than 100k, exit early - if object_count < 100000: - return - # Update the announcement - announcement.message = f'Only professionals have {object_count:,} Findings and Endpoints in their systems... Get DefectDojo Pro today!' - announcement.save() - - @app.task def clear_sessions(*args, **kwargs): call_command("clearsessions") diff --git a/dojo/templates/base.html b/dojo/templates/base.html index 54d6e0e58af..b8f5489f1d7 100644 --- a/dojo/templates/base.html +++ b/dojo/templates/base.html @@ -199,7 +199,7 @@ {% endif %} - {% if CREATE_CLOUD_BANNER %} + {% if SHOW_PLG_LINK %}
  • @@ -671,8 +671,21 @@ {% endif %} {% for banner in additional_banners %} -
  • Major feature A
  • ", result["expanded_html"]) + self.assertIn("
  • Major feature B
  • ", result["expanded_html"]) + + def test_missing_headline_returns_none(self): + text = "No headline here\n## Expanded Message\nbody\n" + self.assertIsNone(os_message.parse_os_message(text)) + + def test_headline_inline_markdown(self): + text = "# Read the **release notes** at [link](https://example.com)\n" + result = os_message.parse_os_message(text) + self.assertIn("release notes", result["message"]) + self.assertIn('link', result["message"]) + self.assertIsNone(result["expanded_html"]) + + def test_headline_strips_disallowed_html(self): + text = "# Headline tail\n" + result = os_message.parse_os_message(text) + self.assertNotIn("", result["message"]) + self.assertIn("Headline", result["message"]) + + def test_missing_expanded_section(self): + text = "# Just a headline\n" + result = os_message.parse_os_message(text) + self.assertEqual(result["message"], "Just a headline") + self.assertIsNone(result["expanded_html"]) + + def test_expanded_with_fenced_code(self): + text = ( + "# Headline\n" + "## Expanded Message\n" + "```python\n" + "print('hi')\n" + "```\n" + ) + result = os_message.parse_os_message(text) + self.assertIn("
    ", result["expanded_html"])
    +        self.assertIn("", result["expanded_html"])
    +        self.assertIn("print('hi')", result["expanded_html"])
    +
    +    def test_expanded_strips_script_tag(self):
    +        text = (
    +            "# Headline\n"
    +            "## Expanded Message\n"
    +            "\n"
    +            "Body paragraph\n"
    +        )
    +        result = os_message.parse_os_message(text)
    +        self.assertNotIn("", result["expanded_html"])
    +        self.assertIn("Body paragraph", result["expanded_html"])
    +
    +    def test_headline_outer_p_is_stripped(self):
    +        text = "# Plain headline\n"
    +        result = os_message.parse_os_message(text)
    +        self.assertFalse(result["message"].startswith("

    ")) + self.assertFalse(result["message"].endswith("

    ")) + + def test_headline_truncated_to_100_chars(self): + long_headline = "x" * 200 + text = f"# {long_headline}\n" + result = os_message.parse_os_message(text) + self.assertLessEqual(len(result["message"]), 100) + + +@override_settings(CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}}) +class TestFetchOsMessage(SimpleTestCase): + + def setUp(self): + cache.clear() + + def test_200_with_body_caches_body(self): + body = "# headline\n" + with patch("dojo.announcement.os_message.requests.get", return_value=_Resp(200, body)) as mock_get: + result = os_message.fetch_os_message() + self.assertEqual(result, body) + self.assertEqual(cache.get(os_message.CACHE_KEY), body) + mock_get.assert_called_once() + + def test_404_caches_none(self): + with patch("dojo.announcement.os_message.requests.get", return_value=_Resp(404, "not found")): + result = os_message.fetch_os_message() + self.assertIsNone(result) + self.assertIsNone(cache.get(os_message.CACHE_KEY, default="sentinel")) + + def test_timeout_caches_none(self): + with patch("dojo.announcement.os_message.requests.get", side_effect=requests.exceptions.Timeout): + result = os_message.fetch_os_message() + self.assertIsNone(result) + self.assertIsNone(cache.get(os_message.CACHE_KEY, default="sentinel")) + + def test_connection_error_caches_none(self): + with patch("dojo.announcement.os_message.requests.get", side_effect=requests.exceptions.ConnectionError): + result = os_message.fetch_os_message() + self.assertIsNone(result) + self.assertIsNone(cache.get(os_message.CACHE_KEY, default="sentinel")) + + def test_empty_body_caches_none(self): + with patch("dojo.announcement.os_message.requests.get", return_value=_Resp(200, " \n\n")): + result = os_message.fetch_os_message() + self.assertIsNone(result) + self.assertIsNone(cache.get(os_message.CACHE_KEY, default="sentinel")) + + def test_second_call_hits_cache(self): + with patch("dojo.announcement.os_message.requests.get", return_value=_Resp(200, "# h\n")) as mock_get: + os_message.fetch_os_message() + os_message.fetch_os_message() + self.assertEqual(mock_get.call_count, 1) + + def test_second_call_after_failure_also_hits_cache(self): + with patch("dojo.announcement.os_message.requests.get", side_effect=requests.exceptions.Timeout) as mock_get: + os_message.fetch_os_message() + os_message.fetch_os_message() + self.assertEqual(mock_get.call_count, 1) + + +@override_settings(CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}}) +class TestGetOsBanner(SimpleTestCase): + + def setUp(self): + cache.clear() + + def test_returns_none_when_fetch_returns_none(self): + with patch("dojo.announcement.os_message.fetch_os_message", return_value=None): + self.assertIsNone(os_message.get_os_banner()) + + def test_swallows_parse_exception(self): + with patch("dojo.announcement.os_message.fetch_os_message", return_value="# ok\n"), \ + patch("dojo.announcement.os_message.parse_os_message", side_effect=RuntimeError("boom")): + self.assertIsNone(os_message.get_os_banner()) + + +@override_settings(CACHES={"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}}) +class TestGlobalizeVarsOsBanner(SimpleTestCase): + + def setUp(self): + cache.clear() + self.request = RequestFactory().get("/") + + def test_additional_banners_populated_when_banner_present(self): + banner = {"message": "Hi", "expanded_html": "

    body

    "} + with patch.object(context_processors, "get_os_banner", return_value=banner): + result = context_processors.globalize_vars(self.request) + self.assertIn("additional_banners", result) + entry = result["additional_banners"][0] + self.assertEqual(entry["source"], "os") + self.assertEqual(entry["message"], "Hi") + self.assertEqual(entry["expanded_html"], "

    body

    ") + self.assertEqual(entry["style"], "info") + self.assertEqual(entry["url"], "") + self.assertEqual(entry["link_text"], "") + + def test_additional_banners_absent_when_no_banner(self): + with patch.object(context_processors, "get_os_banner", return_value=None): + result = context_processors.globalize_vars(self.request) + self.assertNotIn("additional_banners", result) + + def test_show_plg_link_is_true_by_default(self): + with patch.object(context_processors, "get_os_banner", return_value=None): + result = context_processors.globalize_vars(self.request) + self.assertTrue(result["SHOW_PLG_LINK"]) + + def test_create_cloud_banner_not_in_context(self): + with patch.object(context_processors, "get_os_banner", return_value=None): + result = context_processors.globalize_vars(self.request) + self.assertNotIn("CREATE_CLOUD_BANNER", result) + + def test_template_renders_bleached_message(self): + banner = {"message": "Hi", "expanded_html": None} + with patch.object(context_processors, "get_os_banner", return_value=banner): + ctx = context_processors.globalize_vars(self.request) + rendered = Template( + "{% for b in additional_banners %}{{ b.message|safe }}{% endfor %}", + ).render(Context(ctx)) + self.assertIn("Hi", rendered) + + def test_session_product_banners_merged_into_additional_banners(self): + session_banner = { + "source": "product_announcement", + "message": "Pro has async imports!", + "style": "info", + "url": "", + "link_text": "", + "expanded_html": None, + } + self.request.session = {"_product_banners": [session_banner]} + with patch.object(context_processors, "get_os_banner", return_value=None): + result = context_processors.globalize_vars(self.request) + self.assertIn("additional_banners", result) + self.assertEqual(len(result["additional_banners"]), 1) + self.assertEqual(result["additional_banners"][0]["source"], "product_announcement") + self.assertEqual(self.request.session.get("_product_banners"), None) + + def test_os_and_session_banners_combined(self): + os_banner = {"message": "OS msg", "expanded_html": None} + session_banner = { + "source": "product_announcement", + "message": "Pro msg", + "style": "info", + "url": "", + "link_text": "", + "expanded_html": None, + } + self.request.session = {"_product_banners": [session_banner]} + with patch.object(context_processors, "get_os_banner", return_value=os_banner): + result = context_processors.globalize_vars(self.request) + self.assertEqual(len(result["additional_banners"]), 2) + self.assertEqual(result["additional_banners"][0]["source"], "os") + self.assertEqual(result["additional_banners"][1]["source"], "product_announcement") diff --git a/unittests/test_pdf_report_rendering.py b/unittests/test_pdf_report_rendering.py new file mode 100644 index 00000000000..1cdfe1e34e1 --- /dev/null +++ b/unittests/test_pdf_report_rendering.py @@ -0,0 +1,223 @@ +from django.template import engines +from django.utils.timezone import now + +from dojo.models import ( + Engagement, + Finding, + Product, + Product_Type, + Test, + Test_Type, + User, +) +from unittests.dojo_test_case import DojoTestCase, versioned_fixtures + + +@versioned_fixtures +class TestPdfReportTextWrapping(DojoTestCase): + + """ + Tests that PDF report templates render long and pre-wrapped content + within margins instead of overflowing. + """ + + fixtures = ["dojo_testdata.json"] + + LONG_URL = "https://app.example.com/assets/vendor-" + "a1b2c3d4" * 8 + ".js.map" + + # Content with an embedded
     tag, simulating imports (e.g. BugCrowd CSV)
    +    # that store HTML-wrapped text in finding fields.
    +    DESCRIPTION_WITH_PRE = (
    +        "
    \n"
    +        "An internal debug configuration file (debug-config-e7f3a901.json) is publicly "
    +        "accessible at the URL: " + LONG_URL + ". "
    +        "Debug configuration files can reveal internal service addresses, feature flags, "
    +        "and environment variables. Exposing such files can leak sensitive information "
    +        "about the application infrastructure, aiding attackers in lateral movement and "
    +        "facilitating exploitation of internal services.\n"
    +        "
    " + ) + + MITIGATION_WITH_PRE = ( + '
    \n'
    +        "Remove Debug Files From Public Directories: Ensure .json debug configuration "
    +        "files are not deployed to publicly accessible paths on the web server.\n"
    +        "Restrict Access: If debug configurations are required in staging environments, "
    +        "restrict access to authenticated admin users only via IP whitelisting.\n"
    +        "Environment-Specific Builds: Use separate build profiles for development and "
    +        "production to ensure debug artifacts are excluded from release bundles.\n"
    +        "Audit Build Artifacts: Regularly scan deployment artifacts for unintended "
    +        "inclusion of debug or configuration files.\n"
    +        "
    " + ) + + IMPACT_WITH_PRE = ( + '
    \n'
    +        "Information Disclosure: Attackers can discover internal microservice endpoints, "
    +        "feature flag states, and environment-specific configuration values.\n"
    +        "Lateral Movement: Revealed internal addresses may allow attackers to probe "
    +        "backend services that are not intended to be publicly reachable.\n"
    +        "Credential Exposure: If the debug configuration includes API keys or tokens "
    +        "left by developers, this could lead to unauthorized access.\n"
    +        "
    " + ) + + STEPS_WITH_PRE = ( + '
    \n'
    +        "Open a web browser.\n"
    +        "Navigate to the URL: " + LONG_URL + ".\n"
    +        "Observe that the configuration file is accessible and can be downloaded.\n"
    +        "Review the file contents for internal service addresses and environment variables.\n"
    +        "
    " + ) + + # Plain markdown content (no embedded
     tags) with a very long unbroken token
    +    DESCRIPTION_LONG_TOKEN = (
    +        "A session token was observed in the query string: "
    +        "token=" + "x" * 300 + " "
    +        "which exceeds normal length and may cause rendering issues in reports."
    +    )
    +
    +    def setUp(self):
    +        super().setUp()
    +        self.user = User.objects.get(username="admin")
    +        self.product_type = Product_Type.objects.create(name="Report Test PT")
    +        self.product = Product.objects.create(
    +            name="Report Test Product",
    +            description="Product for report tests",
    +            prod_type=self.product_type,
    +        )
    +        self.engagement = Engagement.objects.create(
    +            name="Report Test Engagement",
    +            product=self.product,
    +            target_start=now(),
    +            target_end=now(),
    +        )
    +        self.test_type = Test_Type.objects.create(name="Report Test Scan")
    +        self.test_obj = Test.objects.create(
    +            engagement=self.engagement,
    +            test_type=self.test_type,
    +            title="Report Rendering Test",
    +            target_start=now(),
    +            target_end=now(),
    +        )
    +        self.django_engine = engines["django"]
    +
    +    def _create_finding(self, **kwargs):
    +        defaults = {
    +            "title": "Debug Configuration File Exposed",
    +            "test": self.test_obj,
    +            "severity": "Medium",
    +            "description": self.DESCRIPTION_WITH_PRE,
    +            "mitigation": self.MITIGATION_WITH_PRE,
    +            "impact": self.IMPACT_WITH_PRE,
    +            "steps_to_reproduce": self.STEPS_WITH_PRE,
    +            "active": True,
    +            "verified": True,
    +            "reporter": self.user,
    +            "numerical_severity": "S2",
    +            "date": now().date(),
    +        }
    +        defaults.update(kwargs)
    +        return Finding.objects.create(**defaults)
    +
    +    def _render_finding_report(self, findings):
    +        """Render finding_pdf_report.html with the given findings and return HTML."""
    +        template = self.django_engine.get_template("dojo/finding_pdf_report.html")
    +        context = {
    +            "report_name": "Finding Report",
    +            "findings": findings,
    +            "include_finding_notes": 0,
    +            "include_finding_images": 0,
    +            "include_executive_summary": 0,
    +            "include_table_of_contents": 0,
    +            "include_disclaimer": 0,
    +            "disclaimer": "",
    +            "user": self.user,
    +            "team_name": "Test Team",
    +            "title": "Finding Report",
    +            "host": "http://localhost:8080",
    +            "user_id": self.user.id,
    +        }
    +        return template.render(context)
    +
    +    def test_no_nested_pre_tags_in_report(self):
    +        """
    +        Markdown-rendered fields should not produce nested 
     elements.
    +
    +        When imported data already contains 
     tags (common with BugCrowd CSV
    +        imports), the template wrapper must not add an additional 
     layer.
    +        The outer wrapper should be a 
    . + """ + finding = self._create_finding() + html = self._render_finding_report(Finding.objects.filter(pk=finding.pk)) + + # The template should wrap markdown-rendered fields in div.report-field, + # not in
     tags. We should not see 
     nesting.
    +        self.assertNotIn("
    ", html)
    +        self.assertNotIn("
    +        # Find the section containing our long token
    +        idx = html.index("x" * 300)
    +        # Walk backwards to find the nearest opening tag
    +        preceding = html[max(0, idx - 500):idx]
    +        self.assertIn("report-field", preceding)
    +
    +    def test_report_base_css_has_overflow_wrap(self):
    +        """The report base template must include overflow-wrap for text wrapping."""
    +        template = self.django_engine.get_template("report_base.html")
    +        source = template.template.source
    +
    +        self.assertIn("overflow-wrap: break-word", source)
    +
    +    def test_report_base_css_styles_nested_pre(self):
    +        """
    +        The report base CSS must style .report-field pre to prevent
    +        nested 
     elements from breaking out of margins.
    +        """
    +        template = self.django_engine.get_template("report_base.html")
    +        source = template.template.source
    +
    +        self.assertIn(".report-field pre", source)
    +        self.assertIn("overflow-wrap: break-word", source)
    +
    +    def test_raw_request_pre_tags_preserved(self):
    +        """
    +        Raw request/response 
     tags should remain unchanged.
    +
    +        Only markdown-rendered fields should use div.report-field wrappers.
    +        The raw_request class pre tags are for literal request/response data
    +        and should stay as 
    .
    +        """
    +        template = self.django_engine.get_template("dojo/finding_pdf_report.html")
    +        source = template.template.source
    +        self.assertIn('class="raw_request"', source)
    +        # raw_request should still be inside 
     tags
    +        self.assertIn('
    ', source)
    diff --git a/unittests/test_permissions_audit.py b/unittests/test_permissions_audit.py
    index 8904c6270be..9d4ca0a746e 100644
    --- a/unittests/test_permissions_audit.py
    +++ b/unittests/test_permissions_audit.py
    @@ -12,6 +12,7 @@
     8. Questionnaire cross-engagement IDOR (H1 #3571957)
     9. Finding Templates exposure via find_template_to_apply (H1 #3577363)
     10. Jira Epic BFLA - Reader cannot trigger update_jira_epic (H1 #3577193)
    +11. Risk Acceptance remove_finding: edit_mode guard + scoped finding lookup (PR #14633)
     """
     import datetime
     
    @@ -701,6 +702,266 @@ def test_view_risk_acceptance_same_engagement(self):
             self.assertEqual(response.status_code, 200)
     
     
    +class TestRiskAcceptanceRemoveFindingGuard(DojoTestCase):
    +
    +    """
    +    PR #14633: view_edit_risk_acceptance must:
    +    1. Only process 'remove_finding' when edit_mode is True (Writer+ via edit URL).
    +    2. Scope the finding lookup to risk_acceptance.accepted_findings (not global Finding).
    +
    +    Prevents a Reader from removing findings via the view URL, and prevents
    +    cross-product blind enumeration of finding IDs.
    +    """
    +
    +    @classmethod
    +    def setUpTestData(cls):
    +        cls.reader_role = Role.objects.get(name="Reader")
    +        cls.owner_role = Role.objects.get(name="Owner")
    +
    +        # ── Product A ────────────────────────────────────────────────
    +        cls.product_type_a = Product_Type.objects.create(
    +            name="RA Remove Guard Test PT A",
    +        )
    +        cls.product_a = Product.objects.create(
    +            name="RA Remove Guard Product A",
    +            description="Test",
    +            prod_type=cls.product_type_a,
    +            enable_full_risk_acceptance=True,
    +        )
    +
    +        # ── Product B (for cross-product IDOR test) ─────────────────
    +        cls.product_type_b = Product_Type.objects.create(
    +            name="RA Remove Guard Test PT B",
    +        )
    +        cls.product_b = Product.objects.create(
    +            name="RA Remove Guard Product B",
    +            description="Test",
    +            prod_type=cls.product_type_b,
    +            enable_full_risk_acceptance=True,
    +        )
    +
    +        # ── Users ────────────────────────────────────────────────────
    +        cls.reader_user_a = Dojo_User.objects.create_user(
    +            username="ra_remove_reader_a",
    +            password="testTEST1234!@#$",  # noqa: S106
    +            is_active=True,
    +        )
    +        cls.owner_user_a = Dojo_User.objects.create_user(
    +            username="ra_remove_owner_a",
    +            password="testTEST1234!@#$",  # noqa: S106
    +            is_active=True,
    +        )
    +        cls.owner_user_b = Dojo_User.objects.create_user(
    +            username="ra_remove_owner_b",
    +            password="testTEST1234!@#$",  # noqa: S106
    +            is_active=True,
    +        )
    +
    +        # ── Role assignments ─────────────────────────────────────────
    +        Product_Member.objects.create(
    +            product=cls.product_a,
    +            user=cls.reader_user_a,
    +            role=cls.reader_role,
    +        )
    +        Product_Member.objects.create(
    +            product=cls.product_a,
    +            user=cls.owner_user_a,
    +            role=cls.owner_role,
    +        )
    +        Product_Member.objects.create(
    +            product=cls.product_b,
    +            user=cls.owner_user_b,
    +            role=cls.owner_role,
    +        )
    +
    +        # ── Product A: engagement, test, findings, risk acceptance ───
    +        cls.engagement_a = Engagement.objects.create(
    +            name="RA Remove Guard Engagement A",
    +            product=cls.product_a,
    +            target_start=datetime.date(2024, 1, 1),
    +            target_end=datetime.date(2024, 12, 31),
    +        )
    +        test_type, _ = Test_Type.objects.get_or_create(
    +            name="Manual Code Review",
    +        )
    +        cls.test_a = Test.objects.create(
    +            engagement=cls.engagement_a,
    +            test_type=test_type,
    +            target_start=timezone.now(),
    +            target_end=timezone.now(),
    +        )
    +
    +        # Finding that IS in the risk acceptance
    +        cls.finding_a = Finding.objects.create(
    +            title="RA Remove Guard Finding A",
    +            test=cls.test_a,
    +            severity="High",
    +            numerical_severity="S1",
    +            active=False,
    +            risk_accepted=True,
    +            reporter=cls.owner_user_a,
    +        )
    +
    +        # Finding in same engagement but NOT in the risk acceptance
    +        cls.finding_a_extra = Finding.objects.create(
    +            title="RA Remove Guard Finding A Extra",
    +            test=cls.test_a,
    +            severity="Medium",
    +            numerical_severity="S2",
    +            active=True,
    +            risk_accepted=False,
    +            reporter=cls.owner_user_a,
    +        )
    +
    +        cls.risk_acceptance_a = Risk_Acceptance.objects.create(
    +            name="RA Remove Guard RA A",
    +            owner=cls.owner_user_a,
    +        )
    +        cls.risk_acceptance_a.accepted_findings.add(cls.finding_a)
    +        cls.engagement_a.risk_acceptance.add(cls.risk_acceptance_a)
    +
    +        # ── Product B: engagement, test, finding, risk acceptance ────
    +        cls.engagement_b = Engagement.objects.create(
    +            name="RA Remove Guard Engagement B",
    +            product=cls.product_b,
    +            target_start=datetime.date(2024, 1, 1),
    +            target_end=datetime.date(2024, 12, 31),
    +        )
    +        cls.test_b = Test.objects.create(
    +            engagement=cls.engagement_b,
    +            test_type=test_type,
    +            target_start=timezone.now(),
    +            target_end=timezone.now(),
    +        )
    +        cls.finding_b = Finding.objects.create(
    +            title="RA Remove Guard Finding B",
    +            test=cls.test_b,
    +            severity="High",
    +            numerical_severity="S1",
    +            active=False,
    +            risk_accepted=True,
    +            reporter=cls.owner_user_b,
    +        )
    +
    +        cls.risk_acceptance_b = Risk_Acceptance.objects.create(
    +            name="RA Remove Guard RA B",
    +            owner=cls.owner_user_b,
    +        )
    +        cls.risk_acceptance_b.accepted_findings.add(cls.finding_b)
    +        cls.engagement_b.risk_acceptance.add(cls.risk_acceptance_b)
    +
    +    def _login(self, username):
    +        client = Client()
    +        client.login(
    +            username=username,
    +            password="testTEST1234!@#$",  # noqa: S106
    +        )
    +        return client
    +
    +    def _remove_finding_data(self, finding_id):
    +        return {
    +            "remove_finding": "Remove",
    +            "remove_finding_id": finding_id,
    +        }
    +
    +    # ── Test 1: edit_mode guard (BFLA) ───────────────────────────────
    +
    +    def test_reader_cannot_remove_finding_via_view_url(self):
    +        """Reader POSTing remove_finding to view URL must be silently ignored."""
    +        client = self._login("ra_remove_reader_a")
    +        url = reverse("view_risk_acceptance", args=(
    +            self.engagement_a.id, self.risk_acceptance_a.id,
    +        ))
    +        response = client.post(url, self._remove_finding_data(self.finding_a.id))
    +        # View still redirects (302) because errors=False, but finding is untouched
    +        self.assertEqual(response.status_code, 302)
    +        self.finding_a.refresh_from_db()
    +        self.assertFalse(self.finding_a.active)
    +        self.assertTrue(self.finding_a.risk_accepted)
    +        self.assertTrue(
    +            self.risk_acceptance_a.accepted_findings
    +            .filter(pk=self.finding_a.pk)
    +            .exists(),
    +        )
    +
    +    # ── Test 2: positive regression (edit URL works) ─────────────────
    +
    +    def test_owner_can_remove_finding_via_edit_url(self):
    +        """Owner POSTing remove_finding to edit URL must succeed."""
    +        client = self._login("ra_remove_owner_a")
    +        url = reverse("edit_risk_acceptance", args=(
    +            self.engagement_a.id, self.risk_acceptance_a.id,
    +        ))
    +        response = client.post(url, self._remove_finding_data(self.finding_a.id))
    +        self.assertEqual(response.status_code, 302)
    +        self.finding_a.refresh_from_db()
    +        self.assertTrue(self.finding_a.active)
    +        self.assertFalse(self.finding_a.risk_accepted)
    +        self.assertFalse(
    +            self.risk_acceptance_a.accepted_findings
    +            .filter(pk=self.finding_a.pk)
    +            .exists(),
    +        )
    +
    +    # ── Test 3: scoped lookup (finding not in this RA) ───────────────
    +
    +    def test_finding_not_in_risk_acceptance_returns_404(self):
    +        """Supplying a finding ID not in the RA's accepted_findings must 404."""
    +        client = self._login("ra_remove_owner_a")
    +        url = reverse("edit_risk_acceptance", args=(
    +            self.engagement_a.id, self.risk_acceptance_a.id,
    +        ))
    +        # finding_a_extra exists in the same engagement but is NOT accepted
    +        response = client.post(
    +            url, self._remove_finding_data(self.finding_a_extra.id),
    +        )
    +        self.assertEqual(response.status_code, 404)
    +
    +    # ── Test 4: cross-product IDOR ───────────────────────────────────
    +
    +    def test_cross_product_finding_id_rejected(self):
    +        """Finding from Product B cannot be removed via Product A's RA."""
    +        client = self._login("ra_remove_owner_a")
    +        url = reverse("edit_risk_acceptance", args=(
    +            self.engagement_a.id, self.risk_acceptance_a.id,
    +        ))
    +        response = client.post(
    +            url, self._remove_finding_data(self.finding_b.id),
    +        )
    +        self.assertEqual(response.status_code, 404)
    +        # Product B's finding must remain untouched
    +        self.finding_b.refresh_from_db()
    +        self.assertFalse(self.finding_b.active)
    +        self.assertTrue(self.finding_b.risk_accepted)
    +
    +    # ── Test 5: Reader blocked by decorator on edit URL ──────────────
    +
    +    def test_reader_blocked_on_edit_url_by_decorator(self):
    +        """Reader lacks Risk_Acceptance permission — edit URL itself is denied."""
    +        client = self._login("ra_remove_reader_a")
    +        url = reverse("edit_risk_acceptance", args=(
    +            self.engagement_a.id, self.risk_acceptance_a.id,
    +        ))
    +        response = client.post(url, self._remove_finding_data(self.finding_a.id))
    +        # PermissionDenied raised; custom handler403 returns 400 (DD bug)
    +        self.assertIn(response.status_code, [400, 403])
    +        # Finding must remain untouched
    +        self.finding_a.refresh_from_db()
    +        self.assertFalse(self.finding_a.active)
    +        self.assertTrue(self.finding_a.risk_accepted)
    +
    +    # ── Test 6: nonexistent finding ID ───────────────────────────────
    +
    +    def test_nonexistent_finding_id_returns_404(self):
    +        """A bogus finding ID must produce 404 from the scoped lookup."""
    +        client = self._login("ra_remove_owner_a")
    +        url = reverse("edit_risk_acceptance", args=(
    +            self.engagement_a.id, self.risk_acceptance_a.id,
    +        ))
    +        response = client.post(url, self._remove_finding_data(999999))
    +        self.assertEqual(response.status_code, 404)
    +
    +
     class TestEngagementPresetsCrossProductIDOR(DojoTestCase):
     
         """
    @@ -1425,3 +1686,162 @@ def test_risk_acceptance_download_proof_writer_allowed(self):
     
             # Clean up uploaded file
             self.risk_acceptance.path.delete(save=True)
    +
    +
    +class TestEngagementMovePermission(DojoTestCase):
    +
    +    """Moving an engagement to another product requires Engagement_Edit on the destination."""
    +
    +    @classmethod
    +    def setUpTestData(cls):
    +        cls.owner_role = Role.objects.get(name="Owner")
    +        cls.product_type = Product_Type.objects.create(name="Eng Move Test PT")
    +
    +        cls.product_a = Product.objects.create(
    +            name="Eng Move Product A", description="Test", prod_type=cls.product_type,
    +        )
    +        cls.product_b = Product.objects.create(
    +            name="Eng Move Product B", description="Test", prod_type=cls.product_type,
    +        )
    +        cls.product_c = Product.objects.create(
    +            name="Eng Move Product C", description="Test", prod_type=cls.product_type,
    +        )
    +
    +        cls.user = Dojo_User.objects.create_user(
    +            username="eng_move_owner",
    +            password="testTEST1234!@#$",  # noqa: S106
    +            is_active=True,
    +        )
    +        Product_Member.objects.create(product=cls.product_a, user=cls.user, role=cls.owner_role)
    +        # No membership on product_b -- user cannot move engagements there
    +        Product_Member.objects.create(product=cls.product_c, user=cls.user, role=cls.owner_role)
    +
    +    def setUp(self):
    +        self.engagement = Engagement.objects.create(
    +            name="Move Test Engagement",
    +            product=self.product_a,
    +            target_start=datetime.date.today(),
    +            target_end=datetime.date.today(),
    +        )
    +
    +    def _api_client(self):
    +        token, _ = Token.objects.get_or_create(user=self.user)
    +        client = APIClient()
    +        client.credentials(HTTP_AUTHORIZATION="Token " + token.key)
    +        return client
    +
    +    def _ui_client(self):
    +        client = Client()
    +        client.login(username="eng_move_owner", password="testTEST1234!@#$")  # noqa: S106
    +        return client
    +
    +    # ── API: PATCH ────────────────────────────────────────────────────
    +
    +    def test_api_patch_move_to_authorized_product(self):
    +        """PATCH with product the user has access to should succeed."""
    +        client = self._api_client()
    +        url = reverse("engagement-detail", args=(self.engagement.id,))
    +        response = client.patch(url, {"product": self.product_c.id}, format="json")
    +        self.assertEqual(response.status_code, 200, response.content)
    +        self.engagement.refresh_from_db()
    +        self.assertEqual(self.engagement.product, self.product_c)
    +
    +    def test_api_patch_move_to_unauthorized_product(self):
    +        """PATCH with product the user lacks access to should be denied."""
    +        client = self._api_client()
    +        url = reverse("engagement-detail", args=(self.engagement.id,))
    +        response = client.patch(url, {"product": self.product_b.id}, format="json")
    +        self.assertEqual(response.status_code, 403, response.content)
    +        self.engagement.refresh_from_db()
    +        self.assertEqual(self.engagement.product, self.product_a)
    +
    +    def test_api_patch_same_product(self):
    +        """PATCH with the same product should succeed without extra permission check."""
    +        client = self._api_client()
    +        url = reverse("engagement-detail", args=(self.engagement.id,))
    +        response = client.patch(url, {"product": self.product_a.id}, format="json")
    +        self.assertEqual(response.status_code, 200, response.content)
    +
    +    def test_api_patch_without_product_field(self):
    +        """PATCH without product field should succeed (no spurious check)."""
    +        client = self._api_client()
    +        url = reverse("engagement-detail", args=(self.engagement.id,))
    +        response = client.patch(url, {"version": "1.0"}, format="json")
    +        self.assertEqual(response.status_code, 200, response.content)
    +
    +    # ── API: PUT ──────────────────────────────────────────────────────
    +
    +    def test_api_put_move_to_authorized_product(self):
    +        """PUT with product the user has access to should succeed."""
    +        client = self._api_client()
    +        url = reverse("engagement-detail", args=(self.engagement.id,))
    +        payload = {
    +            "name": "Move Test Engagement",
    +            "product": self.product_c.id,
    +            "target_start": str(datetime.date.today()),
    +            "target_end": str(datetime.date.today()),
    +            "engagement_type": "Interactive",
    +            "status": "Not Started",
    +        }
    +        response = client.put(url, payload, format="json")
    +        self.assertEqual(response.status_code, 200, response.content)
    +        self.engagement.refresh_from_db()
    +        self.assertEqual(self.engagement.product, self.product_c)
    +
    +    def test_api_put_move_to_unauthorized_product(self):
    +        """PUT with product the user lacks access to should be denied."""
    +        client = self._api_client()
    +        url = reverse("engagement-detail", args=(self.engagement.id,))
    +        payload = {
    +            "name": "Move Test Engagement",
    +            "product": self.product_b.id,
    +            "target_start": str(datetime.date.today()),
    +            "target_end": str(datetime.date.today()),
    +            "engagement_type": "Interactive",
    +            "status": "Not Started",
    +        }
    +        response = client.put(url, payload, format="json")
    +        self.assertEqual(response.status_code, 403, response.content)
    +        self.engagement.refresh_from_db()
    +        self.assertEqual(self.engagement.product, self.product_a)
    +
    +    # ── UI ────────────────────────────────────────────────────────────
    +
    +    def test_ui_move_to_authorized_product(self):
    +        """Edit engagement form moving to authorized product should succeed."""
    +        client = self._ui_client()
    +        url = reverse("edit_engagement", args=(self.engagement.id,))
    +        form_data = {
    +            "product": self.product_c.id,
    +            "target_start": datetime.date.today().strftime("%Y-%m-%d"),
    +            "target_end": datetime.date.today().strftime("%Y-%m-%d"),
    +            "lead": self.user.id,
    +            "status": "Not Started",
    +        }
    +        response = client.post(url, form_data)
    +        self.assertIn(response.status_code, [200, 302], response.content[:500])
    +        self.engagement.refresh_from_db()
    +        self.assertEqual(self.engagement.product, self.product_c)
    +
    +    def test_ui_move_to_unauthorized_product(self):
    +        """
    +        Edit engagement form moving to unauthorized product should be denied.
    +
    +        The form's product queryset is filtered to authorized products, so
    +        submitting an unauthorized product fails form validation (200 with
    +        errors) before the view-level permission check runs.  Either way the
    +        engagement must NOT move.
    +        """
    +        client = self._ui_client()
    +        url = reverse("edit_engagement", args=(self.engagement.id,))
    +        form_data = {
    +            "product": self.product_b.id,
    +            "target_start": datetime.date.today().strftime("%Y-%m-%d"),
    +            "target_end": datetime.date.today().strftime("%Y-%m-%d"),
    +            "lead": self.user.id,
    +            "status": "Not Started",
    +        }
    +        response = client.post(url, form_data)
    +        self.assertIn(response.status_code, [200, 403])
    +        self.engagement.refresh_from_db()
    +        self.assertEqual(self.engagement.product, self.product_a)
    diff --git a/unittests/test_product_announcements.py b/unittests/test_product_announcements.py
    new file mode 100644
    index 00000000000..d9aa39a191f
    --- /dev/null
    +++ b/unittests/test_product_announcements.py
    @@ -0,0 +1,214 @@
    +from collections import UserDict
    +
    +from django.http import HttpRequest, HttpResponse
    +from django.test import SimpleTestCase
    +
    +from dojo.product_announcements import (
    +    ErrorPageProductAnnouncement,
    +    LargeScanSizeProductAnnouncement,
    +    LongRunningRequestProductAnnouncement,
    +    ScanTypeProductAnnouncement,
    +)
    +
    +
    +class _SessionDict(UserDict):
    +
    +    """Minimal session stand-in that supports .get/.pop/[] like Django sessions."""
    +
    +
    +def _make_request():
    +    request = HttpRequest()
    +    request.session = _SessionDict()
    +    return request
    +
    +
    +def _make_response(data=None):
    +    response = HttpResponse()
    +    response.data = data if data is not None else {}
    +    return response
    +
    +
    +class TestProductAnnouncementSessionBanner(SimpleTestCase):
    +
    +    def test_stores_banner_in_session(self):
    +        request = _make_request()
    +        ErrorPageProductAnnouncement(request=request)
    +        banners = request.session["_product_banners"]
    +        self.assertEqual(len(banners), 1)
    +        self.assertEqual(banners[0]["source"], "product_announcement")
    +        self.assertEqual(banners[0]["style"], "info")
    +        self.assertIn("Pro comes with support.", banners[0]["message"])
    +        self.assertIsNone(banners[0]["expanded_html"])
    +
    +    def test_multiple_announcements_accumulate_in_session(self):
    +        request = _make_request()
    +        ErrorPageProductAnnouncement(request=request)
    +        ErrorPageProductAnnouncement(request=request)
    +        banners = request.session["_product_banners"]
    +        self.assertEqual(len(banners), 2)
    +
    +    def test_banner_message_contains_outreach_link(self):
    +        request = _make_request()
    +        ErrorPageProductAnnouncement(request=request)
    +        message = request.session["_product_banners"][0]["message"]
    +        self.assertIn("cloud.defectdojo.com", message)
    +        self.assertIn("Try today for free", message)
    +
    +    def test_session_error_is_swallowed(self):
    +        request = HttpRequest()
    +        request.session = None
    +        ErrorPageProductAnnouncement(request=request)
    +
    +    def test_no_settings_guard(self):
    +        """Product announcements fire without any settings check."""
    +        request = _make_request()
    +        ErrorPageProductAnnouncement(request=request)
    +        self.assertEqual(len(request.session["_product_banners"]), 1)
    +
    +
    +class TestProductAnnouncementApiPath(SimpleTestCase):
    +
    +    def test_api_response_gets_pro_key(self):
    +        response = _make_response(data={})
    +        ErrorPageProductAnnouncement(response=response)
    +        self.assertIn("pro", response.data)
    +        self.assertEqual(len(response.data["pro"]), 1)
    +        self.assertIn("Pro comes with support.", str(response.data["pro"][0]))
    +
    +    def test_api_response_appends_to_existing_pro_list(self):
    +        response = _make_response(data={"pro": ["existing"]})
    +        ErrorPageProductAnnouncement(response=response)
    +        self.assertEqual(len(response.data["pro"]), 2)
    +        self.assertEqual(response.data["pro"][0], "existing")
    +
    +    def test_api_response_data_dict_gets_pro_key(self):
    +        data = {}
    +        LargeScanSizeProductAnnouncement(response_data=data, duration=120.0)
    +        self.assertIn("pro", data)
    +
    +    def test_requires_at_least_one_target(self):
    +        with self.assertRaises(ValueError):
    +            ErrorPageProductAnnouncement()
    +
    +
    +class TestErrorPageProductAnnouncement(SimpleTestCase):
    +
    +    def test_message_content(self):
    +        request = _make_request()
    +        ErrorPageProductAnnouncement(request=request)
    +        message = request.session["_product_banners"][0]["message"]
    +        self.assertIn("Pro comes with support.", message)
    +
    +    def test_api_path(self):
    +        response = _make_response()
    +        ErrorPageProductAnnouncement(response=response)
    +        self.assertIn("Pro comes with support.", str(response.data["pro"][0]))
    +
    +
    +class TestLargeScanSizeProductAnnouncement(SimpleTestCase):
    +
    +    def test_fires_when_duration_exceeds_threshold(self):
    +        request = _make_request()
    +        LargeScanSizeProductAnnouncement(request=request, duration=120.0)
    +        banners = request.session["_product_banners"]
    +        self.assertEqual(len(banners), 1)
    +        self.assertIn("import took about 2 minute(s)", banners[0]["message"])
    +        self.assertIn("async imports", banners[0]["message"])
    +
    +    def test_does_not_fire_when_duration_below_threshold(self):
    +        request = _make_request()
    +        LargeScanSizeProductAnnouncement(request=request, duration=30.0)
    +        self.assertEqual(len(request.session.get("_product_banners", [])), 0)
    +
    +    def test_fires_at_boundary(self):
    +        request = _make_request()
    +        LargeScanSizeProductAnnouncement(request=request, duration=60.1)
    +        self.assertEqual(len(request.session["_product_banners"]), 1)
    +
    +    def test_does_not_fire_at_exact_threshold(self):
    +        request = _make_request()
    +        LargeScanSizeProductAnnouncement(request=request, duration=60.0)
    +        self.assertEqual(len(request.session.get("_product_banners", [])), 0)
    +
    +
    +class TestLongRunningRequestProductAnnouncement(SimpleTestCase):
    +
    +    def test_fires_when_duration_exceeds_threshold(self):
    +        request = _make_request()
    +        LongRunningRequestProductAnnouncement(request=request, duration=20.0)
    +        banners = request.session["_product_banners"]
    +        self.assertEqual(len(banners), 1)
    +        self.assertIn("performance tested", banners[0]["message"])
    +
    +    def test_does_not_fire_when_duration_below_threshold(self):
    +        request = _make_request()
    +        LongRunningRequestProductAnnouncement(request=request, duration=10.0)
    +        self.assertEqual(len(request.session.get("_product_banners", [])), 0)
    +
    +    def test_does_not_fire_at_exact_threshold(self):
    +        request = _make_request()
    +        LongRunningRequestProductAnnouncement(request=request, duration=15.0)
    +        self.assertEqual(len(request.session.get("_product_banners", [])), 0)
    +
    +
    +class TestScanTypeProductAnnouncement(SimpleTestCase):
    +
    +    def test_fires_for_supported_scan_type(self):
    +        request = _make_request()
    +        ScanTypeProductAnnouncement(request=request, scan_type="Snyk Scan")
    +        banners = request.session["_product_banners"]
    +        self.assertEqual(len(banners), 1)
    +        self.assertIn("Snyk Scan", banners[0]["message"])
    +        self.assertIn("no-code connector", banners[0]["message"])
    +
    +    def test_does_not_fire_for_unsupported_scan_type(self):
    +        request = _make_request()
    +        ScanTypeProductAnnouncement(request=request, scan_type="Unknown Scanner")
    +        self.assertEqual(len(request.session.get("_product_banners", [])), 0)
    +
    +    def test_does_not_fire_for_none_scan_type(self):
    +        request = _make_request()
    +        ScanTypeProductAnnouncement(request=request, scan_type=None)
    +        self.assertEqual(len(request.session.get("_product_banners", [])), 0)
    +
    +    def test_all_supported_scan_types_fire(self):
    +        for scan_type in ScanTypeProductAnnouncement.supported_scan_types:
    +            request = _make_request()
    +            ScanTypeProductAnnouncement(request=request, scan_type=scan_type)
    +            self.assertEqual(
    +                len(request.session["_product_banners"]), 1,
    +                f"Expected banner for {scan_type}",
    +            )
    +
    +    def test_api_path_for_supported_scan_type(self):
    +        data = {}
    +        ScanTypeProductAnnouncement(response_data=data, scan_type="Wiz Scan")
    +        self.assertIn("pro", data)
    +        self.assertIn("Wiz Scan", str(data["pro"][0]))
    +
    +
    +class TestBannerDictSchema(SimpleTestCase):
    +
    +    """Verify every banner stored in the session has the expected keys."""
    +
    +    EXPECTED_KEYS = {"source", "message", "style", "url", "link_text", "expanded_html"}
    +
    +    def test_error_page_banner_has_all_keys(self):
    +        request = _make_request()
    +        ErrorPageProductAnnouncement(request=request)
    +        self.assertEqual(set(request.session["_product_banners"][0].keys()), self.EXPECTED_KEYS)
    +
    +    def test_large_scan_banner_has_all_keys(self):
    +        request = _make_request()
    +        LargeScanSizeProductAnnouncement(request=request, duration=120.0)
    +        self.assertEqual(set(request.session["_product_banners"][0].keys()), self.EXPECTED_KEYS)
    +
    +    def test_long_running_banner_has_all_keys(self):
    +        request = _make_request()
    +        LongRunningRequestProductAnnouncement(request=request, duration=20.0)
    +        self.assertEqual(set(request.session["_product_banners"][0].keys()), self.EXPECTED_KEYS)
    +
    +    def test_scan_type_banner_has_all_keys(self):
    +        request = _make_request()
    +        ScanTypeProductAnnouncement(request=request, scan_type="Snyk Scan")
    +        self.assertEqual(set(request.session["_product_banners"][0].keys()), self.EXPECTED_KEYS)
    diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py
    index 95807d4c536..7199e08c126 100644
    --- a/unittests/test_rest_framework.py
    +++ b/unittests/test_rest_framework.py
    @@ -3429,6 +3429,95 @@ def test_create_not_authorized_product_name_engagement_name_scan_type_title(self
                 importer_mock.assert_not_called()
                 reimporter_mock.assert_not_called()
     
    +    # Security tests: verify that conflicting ID-based and name-based identifiers are rejected
    +
    +    @patch("dojo.importers.default_reimporter.DefaultReImporter.process_scan")
    +    @patch("dojo.importers.default_importer.DefaultImporter.process_scan")
    +    @patch("dojo.api_v2.permissions.user_has_permission")
    +    def test_reimport_engagement_param_ignored_permission_checked_on_name_resolved_target(self, mock, importer_mock, reimporter_mock):
    +        """
    +        Engagement is not a declared field on ReImportScanSerializer — verify
    +        the permission check uses the name-resolved target, not the engagement param.
    +        """
    +        mock.return_value = False
    +        importer_mock.return_value = IMPORTER_MOCK_RETURN_VALUE
    +        reimporter_mock.return_value = REIMPORTER_MOCK_RETURN_VALUE
    +
    +        with Path("tests/zap_sample.xml").open(encoding="utf-8") as testfile:
    +            payload = {
    +                "minimum_severity": "Low",
    +                "active": True,
    +                "verified": True,
    +                "scan_type": "ZAP Scan",
    +                "file": testfile,
    +                # engagement=1 belongs to Product 2 Engagement 1, but it should be ignored
    +                "engagement": 1,
    +                # These names resolve to Product 2's Engagement 4 -> Test 4
    +                "product_name": "Security How-to",
    +                "engagement_name": "April monthly engagement",
    +                "version": "1.0.0",
    +            }
    +            response = self.client.post(self.url, payload)
    +            self.assertEqual(403, response.status_code, response.content[:1000])
    +            # Permission must be checked on name-resolved Test 4 (in Engagement 4),
    +            # NOT on Test 3 (which belongs to the engagement=1 param)
    +            mock.assert_called_with(User.objects.get(username="admin"),
    +                Test.objects.get(id=4),
    +                Permissions.Import_Scan_Result)
    +            importer_mock.assert_not_called()
    +            reimporter_mock.assert_not_called()
    +
    +    @patch("dojo.importers.default_reimporter.DefaultReImporter.process_scan")
    +    @patch("dojo.importers.default_importer.DefaultImporter.process_scan")
    +    def test_reimport_with_test_id_mismatched_product_name_is_rejected(self, importer_mock, reimporter_mock):
    +        """Sending test ID from one product with product_name from another must be rejected."""
    +        importer_mock.return_value = IMPORTER_MOCK_RETURN_VALUE
    +        reimporter_mock.return_value = REIMPORTER_MOCK_RETURN_VALUE
    +
    +        with Path("tests/zap_sample.xml").open(encoding="utf-8") as testfile:
    +            payload = {
    +                "minimum_severity": "Low",
    +                "active": True,
    +                "verified": True,
    +                "scan_type": "ZAP Scan",
    +                "file": testfile,
    +                # Test 3 belongs to Engagement 1 -> Product 2 ("Security How-to")
    +                "test": 3,
    +                # But product_name points to Product 1 ("Python How-to")
    +                "product_name": "Python How-to",
    +                "version": "1.0.0",
    +            }
    +            response = self.client.post(self.url, payload)
    +            self.assertEqual(400, response.status_code, response.content[:1000])
    +            importer_mock.assert_not_called()
    +            reimporter_mock.assert_not_called()
    +
    +    @patch("dojo.importers.default_reimporter.DefaultReImporter.process_scan")
    +    @patch("dojo.importers.default_importer.DefaultImporter.process_scan")
    +    def test_reimport_with_test_id_mismatched_engagement_name_is_rejected(self, importer_mock, reimporter_mock):
    +        """Sending test ID from one engagement with engagement_name from another must be rejected."""
    +        importer_mock.return_value = IMPORTER_MOCK_RETURN_VALUE
    +        reimporter_mock.return_value = REIMPORTER_MOCK_RETURN_VALUE
    +
    +        with Path("tests/zap_sample.xml").open(encoding="utf-8") as testfile:
    +            payload = {
    +                "minimum_severity": "Low",
    +                "active": True,
    +                "verified": True,
    +                "scan_type": "ZAP Scan",
    +                "file": testfile,
    +                # Test 3 belongs to Engagement 1 ("1st Quarter Engagement")
    +                "test": 3,
    +                # But engagement_name points to a different engagement
    +                "product_name": "Security How-to",
    +                "engagement_name": "April monthly engagement",
    +                "version": "1.0.0",
    +            }
    +            response = self.client.post(self.url, payload)
    +            self.assertEqual(400, response.status_code, response.content[:1000])
    +            importer_mock.assert_not_called()
    +            reimporter_mock.assert_not_called()
    +
     
     @versioned_fixtures
     class ProductTypeTest(BaseClass.BaseClassTest):
    diff --git a/unittests/test_risk_acceptance.py b/unittests/test_risk_acceptance.py
    index c0613bd49ee..628624b6db1 100644
    --- a/unittests/test_risk_acceptance.py
    +++ b/unittests/test_risk_acceptance.py
    @@ -111,7 +111,7 @@ def test_remove_findings_from_risk_acceptance_findings_active(self):
             data = copy.copy(self.data_remove_finding_from_ra)
             data["remove_finding_id"] = 2
             ra = Risk_Acceptance.objects.last()
    -        response = self.client.post(reverse("view_risk_acceptance", args=(1, ra.id)), data)
    +        response = self.client.post(reverse("edit_risk_acceptance", args=(1, ra.id)), data)
             self.assertEqual(302, response.status_code, response.content[:1000])
             self.assert_all_active_not_risk_accepted(Finding.objects.filter(id=2))
             self.assert_all_inactive_risk_accepted(Finding.objects.filter(id=3))
    diff --git a/unittests/test_utils_ssrf.py b/unittests/test_utils_ssrf.py
    new file mode 100644
    index 00000000000..904abf8101c
    --- /dev/null
    +++ b/unittests/test_utils_ssrf.py
    @@ -0,0 +1,75 @@
    +import socket
    +from unittest.mock import patch
    +
    +import requests
    +
    +from dojo.utils_ssrf import SSRFError, _SSRFSafeAdapter, make_ssrf_safe_session, validate_url_for_ssrf  # noqa: PLC2701
    +from unittests.dojo_test_case import DojoTestCase
    +
    +
    +def _addr_info(ip, port=80):
    +    """Build a minimal getaddrinfo-style return value for a single IP."""
    +    return [(socket.AF_INET, socket.SOCK_STREAM, 6, "", (ip, port))]
    +
    +
    +_MIXED_ADDR_INFO = [
    +    (socket.AF_INET, socket.SOCK_STREAM, 6, "", ("8.8.8.8", 80)),
    +    (socket.AF_INET, socket.SOCK_STREAM, 6, "", ("192.168.1.1", 80)),
    +]
    +
    +
    +class TestValidateUrlForSsrf(DojoTestCase):
    +
    +    @patch("dojo.utils_ssrf.socket.getaddrinfo", return_value=_addr_info("8.8.8.8"))
    +    def test_valid_public_url_does_not_raise(self, mock_getaddrinfo):
    +        validate_url_for_ssrf("http://example.com/api")  # should not raise
    +
    +    def test_file_scheme_raises(self):
    +        with self.assertRaisesRegex(SSRFError, "not permitted"):
    +            validate_url_for_ssrf("file:///etc/passwd")
    +
    +    def test_gopher_scheme_raises(self):
    +        with self.assertRaisesRegex(SSRFError, "not permitted"):
    +            validate_url_for_ssrf("gopher://example.com")
    +
    +    def test_no_hostname_raises(self):
    +        with self.assertRaisesRegex(SSRFError, "no hostname"):
    +            validate_url_for_ssrf("http://")
    +
    +    def test_loopback_ip_raises(self):
    +        with self.assertRaisesRegex(SSRFError, "non-public address"):
    +            validate_url_for_ssrf("http://127.0.0.1/")
    +
    +    def test_private_class_c_raises(self):
    +        with self.assertRaisesRegex(SSRFError, "non-public address"):
    +            validate_url_for_ssrf("http://192.168.1.1/")
    +
    +    def test_private_class_a_raises(self):
    +        with self.assertRaisesRegex(SSRFError, "non-public address"):
    +            validate_url_for_ssrf("http://10.0.0.1/")
    +
    +    def test_link_local_raises(self):
    +        with self.assertRaisesRegex(SSRFError, "non-public address"):
    +            validate_url_for_ssrf("http://169.254.1.1/")
    +
    +    @patch("dojo.utils_ssrf.socket.getaddrinfo", side_effect=socket.gaierror("Name or service not known"))
    +    def test_unresolvable_hostname_raises(self, mock_getaddrinfo):
    +        with self.assertRaisesRegex(SSRFError, "Unable to resolve"):
    +            validate_url_for_ssrf("http://nonexistent.invalid/")
    +
    +    @patch("dojo.utils_ssrf.socket.getaddrinfo", return_value=_MIXED_ADDR_INFO)
    +    def test_multi_address_with_private_ip_raises(self, mock_getaddrinfo):
    +        with self.assertRaisesRegex(SSRFError, "non-public address"):
    +            validate_url_for_ssrf("http://example.com/")
    +
    +
    +class TestMakeSsrfSafeSession(DojoTestCase):
    +
    +    def test_returns_requests_session(self):
    +        session = make_ssrf_safe_session()
    +        self.assertIsInstance(session, requests.Session)
    +
    +    def test_http_and_https_mounted_with_safe_adapter(self):
    +        session = make_ssrf_safe_session()
    +        self.assertIsInstance(session.get_adapter("http://example.com"), _SSRFSafeAdapter)
    +        self.assertIsInstance(session.get_adapter("https://example.com"), _SSRFSafeAdapter)
    diff --git a/unittests/tools/test_govulncheck_parser.py b/unittests/tools/test_govulncheck_parser.py
    index 90499dc48fd..3b0a8efc474 100644
    --- a/unittests/tools/test_govulncheck_parser.py
    +++ b/unittests/tools/test_govulncheck_parser.py
    @@ -127,6 +127,8 @@ def test_parse_new_version_many_findings_custom_severity(self):
                     self.assertIsNotNone(finding.impact)
                     self.assertIsNotNone(finding.description)
                     self.assertIsNotNone(finding.references)
    +                self.assertTrue(finding.fix_available)
    +                self.assertEqual("0.3.8", finding.fix_version)
     
         def test_parse_issue_14642(self):
             with (get_unit_tests_scans_path("govulncheck") / "issue_14642.json").open(encoding="utf-8") as testfile:
    diff --git a/unittests/tools/test_risk_recon_parser.py b/unittests/tools/test_risk_recon_parser.py
    index c59b39bd7d9..a2fec88180d 100644
    --- a/unittests/tools/test_risk_recon_parser.py
    +++ b/unittests/tools/test_risk_recon_parser.py
    @@ -1,9 +1,10 @@
     import datetime
    -
    -import requests
    +from unittest.mock import MagicMock, patch
     
     from dojo.models import Test
    +from dojo.tools.risk_recon.api import RiskReconAPI
     from dojo.tools.risk_recon.parser import RiskReconParser
    +from dojo.utils_ssrf import SSRFError
     from unittests.dojo_test_case import DojoTestCase, get_unit_tests_scans_path
     
     
    @@ -11,7 +12,7 @@ class TestRiskReconAPIParser(DojoTestCase):
     
         def test_api_with_bad_url(self):
             with (get_unit_tests_scans_path("risk_recon") / "bad_url.json").open(encoding="utf-8") as testfile, \
    -          self.assertRaises(requests.exceptions.ConnectionError):
    +          self.assertRaises(Exception):  # noqa: B017  # SSRFError is caught and re-raised as Exception in api.py
                 parser = RiskReconParser()
                 parser.get_findings(testfile, Test())
     
    @@ -34,3 +35,20 @@ def test_parser_without_api(self):
                     finding = findings[1]
                     self.assertEqual(datetime.date(2017, 3, 17), finding.date.date())
                     self.assertEqual("ff2bbdbfc2b6gsrgwergwe6b1fasfwefb", finding.unique_id_from_tool)
    +
    +    @patch("dojo.tools.risk_recon.api.validate_url_for_ssrf", side_effect=SSRFError("blocked: private address"))
    +    def test_ssrf_error_is_raised_as_exception(self, mock_validate):
    +        with self.assertRaisesRegex(Exception, "Invalid Risk Recon API url"):
    +            RiskReconAPI(api_key="somekey", endpoint="http://192.168.1.1/api", data=[])
    +        mock_validate.assert_called_once_with("http://192.168.1.1/api")
    +
    +    @patch.object(RiskReconAPI, "get_findings")
    +    @patch.object(RiskReconAPI, "map_toes")
    +    @patch("dojo.tools.risk_recon.api.make_ssrf_safe_session")
    +    @patch("dojo.tools.risk_recon.api.validate_url_for_ssrf")
    +    def test_make_ssrf_safe_session_called_on_init(self, mock_validate, mock_make_session, mock_map_toes, mock_get_findings):
    +        mock_session = MagicMock()
    +        mock_make_session.return_value = mock_session
    +        api = RiskReconAPI(api_key="somekey", endpoint="https://api.riskrecon.com/v1", data=[])
    +        mock_make_session.assert_called_once()
    +        self.assertIs(api.session, mock_session)