diff --git a/dojo/finding/helper.py b/dojo/finding/helper.py index 18ddeba98e2..6c89c15b20c 100644 --- a/dojo/finding/helper.py +++ b/dojo/finding/helper.py @@ -726,11 +726,18 @@ def bulk_clear_finding_m2m(finding_qs): # Remove tags with proper count maintenance bulk_remove_all_tags(Finding, finding_ids) - # Auto-discover and delete remaining (non-tag) M2M through tables - for m2m_field in Finding._meta.many_to_many: - if hasattr(m2m_field, "tag_options"): + # Auto-discover and delete M2M through tables — both forward (Finding._meta.many_to_many) + # and reverse (other models with ManyToManyField pointing to Finding, e.g. Finding_Group.findings). + # Forward M2M fields use field.remote_field.through, reverse use field.through. + m2m_through_models = set() + for field_info in Finding._meta.get_fields(): + if hasattr(field_info, "tag_options"): continue - through_model = m2m_field.remote_field.through + through = getattr(field_info, "through", None) or getattr(getattr(field_info, "remote_field", None), "through", None) + if through is not None: + m2m_through_models.add(through) + + for through_model in m2m_through_models: # Find the FK column that points to Finding fk_column = None for field in through_model._meta.get_fields(): diff --git a/dojo/utils_cascade_delete.py b/dojo/utils_cascade_delete.py index b8c0ee4af66..044c45a539e 100644 --- a/dojo/utils_cascade_delete.py +++ b/dojo/utils_cascade_delete.py @@ -159,27 +159,32 @@ def cascade_delete_related_objects(from_model, instance_pk_query, skip_relations bulk_remove_all_tags(from_model, instance_pk_query) - for m2m_field in from_model._meta.many_to_many: - # Skip tag fields — handled by bulk_remove_all_tags above - if hasattr(m2m_field, "tag_options"): - continue - # Skip if caller already cleaned M2M for this model - if from_model in skip_m2m_for: - continue - through_model = m2m_field.remote_field.through - fk_column = None - for field in through_model._meta.get_fields(): - if hasattr(field, "related_model") and field.related_model is from_model: - fk_column = field.column - break - if fk_column: - filterspec_m2m = {f"{fk_column}__in": models.Subquery(instance_pk_query)} - m2m_count = execute_delete_sql(through_model.objects.filter(**filterspec_m2m)) - if m2m_count: - logger.debug( - "cascade_delete: cleared %d rows from M2M %s", - m2m_count, through_model._meta.db_table, - ) + # Clear all M2M through tables — both forward (from_model._meta.many_to_many) + # and reverse (other models with ManyToManyField pointing to from_model). + # Forward M2M fields use field.remote_field.through, reverse use field.through. + if from_model not in skip_m2m_for: + m2m_through_models = set() + for field_info in from_model._meta.get_fields(): + if hasattr(field_info, "tag_options"): + continue + through = getattr(field_info, "through", None) or getattr(getattr(field_info, "remote_field", None), "through", None) + if through is not None: + m2m_through_models.add(through) + + for through_model in m2m_through_models: + fk_column = None + for field in through_model._meta.get_fields(): + if hasattr(field, "related_model") and field.related_model is from_model: + fk_column = field.column + break + if fk_column: + filterspec_m2m = {f"{fk_column}__in": models.Subquery(instance_pk_query)} + m2m_count = execute_delete_sql(through_model.objects.filter(**filterspec_m2m)) + if m2m_count: + logger.debug( + "cascade_delete: cleared %d rows from M2M %s", + m2m_count, through_model._meta.db_table, + ) # At level 0, do NOT delete root records — the caller handles that # (e.g. via ORM obj.delete() to fire Django signals). diff --git a/unittests/test_prepare_duplicates_for_delete.py b/unittests/test_prepare_duplicates_for_delete.py index de786c50ad6..3630f287fdf 100644 --- a/unittests/test_prepare_duplicates_for_delete.py +++ b/unittests/test_prepare_duplicates_for_delete.py @@ -13,7 +13,19 @@ from django.utils import timezone from dojo.finding.helper import prepare_duplicates_for_delete -from dojo.models import Engagement, Finding, Product, Product_Type, Test, Test_Type, User, UserContactInfo +from dojo.models import ( + Dojo_User, + Engagement, + Finding, + Finding_Group, + Product, + Product_Type, + Risk_Acceptance, + Test, + Test_Type, + User, + UserContactInfo, +) from .dojo_test_case import DojoTestCase @@ -409,3 +421,52 @@ def test_delete_product_with_tags(self): self.assertEqual(product_shared_tag.count, 0) finding_shared_tag.refresh_from_db() self.assertEqual(finding_shared_tag.count, 0) + + def test_delete_product_with_reverse_m2m_relations(self): + """ + Deleting a product with findings that have reverse M2M relations succeeds. + + Reverse M2M through tables (M2M fields on other models pointing to Finding) + must be cleared before findings are deleted. This tests: + - Finding_Group.findings (dojo_finding_group_findings) + - Risk_Acceptance.accepted_findings (dojo_risk_acceptance_accepted_findings) + """ + from dojo.utils import async_delete # noqa: PLC0415 + + finding_a = self._create_finding(self.test1, "Grouped Finding A") + finding_b = self._create_finding(self.test1, "Grouped Finding B") + finding_c = self._create_finding(self.test1, "Risk Accepted Finding") + + # Finding_Group with findings + creator = Dojo_User.objects.first() or Dojo_User.objects.create(username="testcreator") + group = Finding_Group.objects.create( + name="Test Group", + test=self.test1, + creator=creator, + ) + group.findings.add(finding_a, finding_b) + + # Risk_Acceptance with accepted findings + ra = Risk_Acceptance.objects.create( + name="Test RA", + owner=self.testuser, + ) + ra.accepted_findings.add(finding_c) + # Link to engagement so we can verify it survives + self.engagement1.risk_acceptance.add(ra) + + product_id = self.product.id + group_id = group.id + ra_id = ra.id + + with impersonate(self.testuser): + async_del = async_delete() + async_del.delete(self.product) + + self.assertFalse(Product.objects.filter(id=product_id).exists()) + self.assertFalse(Finding_Group.objects.filter(id=group_id).exists()) + self.assertFalse(Finding.objects.filter(id__in=[finding_a.id, finding_b.id, finding_c.id]).exists()) + # Risk_Acceptance itself survives (no FK to product/engagement), + # but its accepted_findings M2M entries should be gone + self.assertTrue(Risk_Acceptance.objects.filter(id=ra_id).exists()) + self.assertEqual(Risk_Acceptance.objects.get(id=ra_id).accepted_findings.count(), 0)