Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions dojo/finding/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,18 @@ def bulk_clear_finding_m2m(finding_qs):
# Remove tags with proper count maintenance
bulk_remove_all_tags(Finding, finding_ids)

# Auto-discover and delete remaining (non-tag) M2M through tables
for m2m_field in Finding._meta.many_to_many:
if hasattr(m2m_field, "tag_options"):
# Auto-discover and delete M2M through tables — both forward (Finding._meta.many_to_many)
# and reverse (other models with ManyToManyField pointing to Finding, e.g. Finding_Group.findings).
# Forward M2M fields use field.remote_field.through, reverse use field.through.
m2m_through_models = set()
for field_info in Finding._meta.get_fields():
if hasattr(field_info, "tag_options"):
continue
through_model = m2m_field.remote_field.through
through = getattr(field_info, "through", None) or getattr(getattr(field_info, "remote_field", None), "through", None)
if through is not None:
m2m_through_models.add(through)

for through_model in m2m_through_models:
# Find the FK column that points to Finding
fk_column = None
for field in through_model._meta.get_fields():
Expand Down
47 changes: 26 additions & 21 deletions dojo/utils_cascade_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,32 @@ def cascade_delete_related_objects(from_model, instance_pk_query, skip_relations

bulk_remove_all_tags(from_model, instance_pk_query)

for m2m_field in from_model._meta.many_to_many:
# Skip tag fields — handled by bulk_remove_all_tags above
if hasattr(m2m_field, "tag_options"):
continue
# Skip if caller already cleaned M2M for this model
if from_model in skip_m2m_for:
continue
through_model = m2m_field.remote_field.through
fk_column = None
for field in through_model._meta.get_fields():
if hasattr(field, "related_model") and field.related_model is from_model:
fk_column = field.column
break
if fk_column:
filterspec_m2m = {f"{fk_column}__in": models.Subquery(instance_pk_query)}
m2m_count = execute_delete_sql(through_model.objects.filter(**filterspec_m2m))
if m2m_count:
logger.debug(
"cascade_delete: cleared %d rows from M2M %s",
m2m_count, through_model._meta.db_table,
)
# Clear all M2M through tables — both forward (from_model._meta.many_to_many)
# and reverse (other models with ManyToManyField pointing to from_model).
# Forward M2M fields use field.remote_field.through, reverse use field.through.
if from_model not in skip_m2m_for:
m2m_through_models = set()
for field_info in from_model._meta.get_fields():
if hasattr(field_info, "tag_options"):
continue
through = getattr(field_info, "through", None) or getattr(getattr(field_info, "remote_field", None), "through", None)
if through is not None:
m2m_through_models.add(through)

for through_model in m2m_through_models:
fk_column = None
for field in through_model._meta.get_fields():
if hasattr(field, "related_model") and field.related_model is from_model:
fk_column = field.column
break
if fk_column:
filterspec_m2m = {f"{fk_column}__in": models.Subquery(instance_pk_query)}
m2m_count = execute_delete_sql(through_model.objects.filter(**filterspec_m2m))
if m2m_count:
logger.debug(
"cascade_delete: cleared %d rows from M2M %s",
m2m_count, through_model._meta.db_table,
)

# At level 0, do NOT delete root records — the caller handles that
# (e.g. via ORM obj.delete() to fire Django signals).
Expand Down
63 changes: 62 additions & 1 deletion unittests/test_prepare_duplicates_for_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,19 @@
from django.utils import timezone

from dojo.finding.helper import prepare_duplicates_for_delete
from dojo.models import Engagement, Finding, Product, Product_Type, Test, Test_Type, User, UserContactInfo
from dojo.models import (
Dojo_User,
Engagement,
Finding,
Finding_Group,
Product,
Product_Type,
Risk_Acceptance,
Test,
Test_Type,
User,
UserContactInfo,
)

from .dojo_test_case import DojoTestCase

Expand Down Expand Up @@ -409,3 +421,52 @@ def test_delete_product_with_tags(self):
self.assertEqual(product_shared_tag.count, 0)
finding_shared_tag.refresh_from_db()
self.assertEqual(finding_shared_tag.count, 0)

def test_delete_product_with_reverse_m2m_relations(self):
"""
Deleting a product with findings that have reverse M2M relations succeeds.

Reverse M2M through tables (M2M fields on other models pointing to Finding)
must be cleared before findings are deleted. This tests:
- Finding_Group.findings (dojo_finding_group_findings)
- Risk_Acceptance.accepted_findings (dojo_risk_acceptance_accepted_findings)
"""
from dojo.utils import async_delete # noqa: PLC0415

finding_a = self._create_finding(self.test1, "Grouped Finding A")
finding_b = self._create_finding(self.test1, "Grouped Finding B")
finding_c = self._create_finding(self.test1, "Risk Accepted Finding")

# Finding_Group with findings
creator = Dojo_User.objects.first() or Dojo_User.objects.create(username="testcreator")
group = Finding_Group.objects.create(
name="Test Group",
test=self.test1,
creator=creator,
)
group.findings.add(finding_a, finding_b)

# Risk_Acceptance with accepted findings
ra = Risk_Acceptance.objects.create(
name="Test RA",
owner=self.testuser,
)
ra.accepted_findings.add(finding_c)
# Link to engagement so we can verify it survives
self.engagement1.risk_acceptance.add(ra)

product_id = self.product.id
group_id = group.id
ra_id = ra.id

with impersonate(self.testuser):
async_del = async_delete()
async_del.delete(self.product)

self.assertFalse(Product.objects.filter(id=product_id).exists())
self.assertFalse(Finding_Group.objects.filter(id=group_id).exists())
self.assertFalse(Finding.objects.filter(id__in=[finding_a.id, finding_b.id, finding_c.id]).exists())
# Risk_Acceptance itself survives (no FK to product/engagement),
# but its accepted_findings M2M entries should be gone
self.assertTrue(Risk_Acceptance.objects.filter(id=ra_id).exists())
self.assertEqual(Risk_Acceptance.objects.get(id=ra_id).accepted_findings.count(), 0)
Loading