Skip to content

Commit bf73492

Browse files
feat(importers): apply import-time tags per batch before post-processing
Previously, import-time tags (apply_tags_to_findings / apply_tags_to_endpoints) were applied after all findings were processed and post-processing tasks (deduplication, rules) had already been dispatched, so those tasks could not see the tags. Apply import-time tags per batch in process_findings(), immediately after parser tags and location persistence, before dojo_dispatch_task() — the same pattern already used by bulk_apply_parser_tags. Closed findings are intentionally excluded: they are absent from the current report and should not receive the import tags. Also renames the reimporter's ambiguous batch_findings (match-batch slice of unsaved parser objects) to unsaved_findings_batch, freeing batch_findings for the new dedupe-batch accumulator and preventing an iterator-mutation bug. Consolidates apply_import_tags() into apply_import_tags_for_batch(), removing a redundant DB re-query and unused Iterable import. Update performance test query counts accordingly (each import loses 1 query from the removed post-loop apply_import_tags call).
1 parent 6788368 commit bf73492

4 files changed

Lines changed: 45 additions & 79 deletions

File tree

dojo/importers/base_importer.py

Lines changed: 13 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import base64
22
import logging
33
import time
4-
from collections.abc import Iterable
54

65
from django.conf import settings
76
from django.core.exceptions import ValidationError
@@ -337,67 +336,36 @@ def update_test_tags(self):
337336
if self.tags is not None and len(self.tags) > 0:
338337
self.test.tags.set(self.tags)
339338

340-
def apply_import_tags(
341-
self,
342-
new_findings: Iterable[Finding] | None = None,
343-
closed_findings: Iterable[Finding] | None = None,
344-
reactivated_findings: Iterable[Finding] | None = None,
345-
untouched_findings: Iterable[Finding] | None = None,
346-
) -> None:
347-
"""Apply tags to findings and endpoints from an import operation."""
348-
# Normalize None values to empty lists and convert sets/other iterables to lists
349-
if untouched_findings is None:
350-
untouched_findings = []
351-
elif not isinstance(untouched_findings, list):
352-
untouched_findings = list(untouched_findings)
353-
354-
if reactivated_findings is None:
355-
reactivated_findings = []
356-
elif not isinstance(reactivated_findings, list):
357-
reactivated_findings = list(reactivated_findings)
358-
359-
if closed_findings is None:
360-
closed_findings = []
361-
elif not isinstance(closed_findings, list):
362-
closed_findings = list(closed_findings)
363-
364-
if new_findings is None:
365-
new_findings = []
366-
elif not isinstance(new_findings, list):
367-
new_findings = list(new_findings)
368-
369-
# Collect all affected findings
370-
findings_to_tag = new_findings + closed_findings + reactivated_findings + untouched_findings
339+
def apply_import_tags_for_batch(self, findings: list[Finding]) -> None:
340+
"""
341+
Apply import-time tags to a batch of already-saved findings and their endpoints.
371342
372-
if not findings_to_tag:
343+
Called per batch inside process_findings(), before post_process_findings_batch is
344+
dispatched, so that rules/deduplication tasks see the import tags on the findings.
345+
"""
346+
if not findings or not self.tags:
373347
return
374-
375-
# Add any tags to the findings imported if necessary
376-
if self.apply_tags_to_findings and self.tags:
377-
findings_qs = Finding.objects.filter(id__in=[f.id for f in findings_to_tag])
348+
if self.apply_tags_to_findings:
378349
try:
379350
bulk_add_tags_to_instances(
380351
tag_or_tags=self.tags,
381-
instances=findings_qs,
352+
instances=findings,
382353
tag_field_name="tags",
383354
)
384355
except IntegrityError:
385-
# Fallback to safe per-instance tagging if concurrent deletes occur
386-
for finding in findings_to_tag:
356+
for finding in findings:
387357
for tag in self.tags:
388358
self.add_tags_safe(finding, tag)
389-
390-
# Add any tags to any locations/endpoints of the findings imported if necessary
391-
if self.apply_tags_to_endpoints and self.tags:
392-
locations_qs = self.location_handler.get_locations_for_tagging(findings_to_tag)
359+
if self.apply_tags_to_endpoints:
360+
locations_qs = self.location_handler.get_locations_for_tagging(findings)
393361
try:
394362
bulk_add_tags_to_instances(
395363
tag_or_tags=self.tags,
396364
instances=locations_qs,
397365
tag_field_name="tags",
398366
)
399367
except IntegrityError:
400-
for finding in findings_to_tag:
368+
for finding in findings:
401369
for location in self.location_handler.get_location_tag_fallback(finding):
402370
for tag in self.tags:
403371
self.add_tags_safe(location, tag)

dojo/importers/default_importer.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,11 +132,6 @@ def process_scan(
132132
new_findings=new_findings,
133133
closed_findings=closed_findings,
134134
)
135-
# Apply tags to findings and endpoints/locations
136-
self.apply_import_tags(
137-
new_findings=new_findings,
138-
closed_findings=closed_findings,
139-
)
140135
# Send out some notifications to the user
141136
logger.debug("IMPORT_SCAN: Generating notifications")
142137
dojo_dispatch_task(
@@ -169,6 +164,7 @@ def process_findings(
169164
) -> list[Finding]:
170165
# Batched post-processing (no chord): dispatch a task per 1000 findings or on final finding
171166
batch_finding_ids: list[int] = []
167+
batch_findings: list[Finding] = []
172168
batch_max_size = getattr(settings, "IMPORT_REIMPORT_DEDUPE_BATCH_SIZE", 1000)
173169

174170
"""
@@ -259,6 +255,7 @@ def process_findings(
259255
push_to_jira = self.push_to_jira and ((not self.findings_groups_enabled or not self.group_by) or not finding_will_be_grouped)
260256
logger.debug("process_findings: computed push_to_jira=%s", push_to_jira)
261257
batch_finding_ids.append(finding.id)
258+
batch_findings.append(finding)
262259

263260
# If batch is full or we're at the end, persist locations/endpoints and dispatch
264261
if len(batch_finding_ids) >= batch_max_size or is_final_finding:
@@ -267,6 +264,9 @@ def process_findings(
267264
# so rules/deduplication tasks see the tags already on the findings.
268265
bulk_apply_parser_tags(findings_with_parser_tags)
269266
findings_with_parser_tags.clear()
267+
# Apply import-time tags before post-processing so rules/deduplication see them.
268+
self.apply_import_tags_for_batch(batch_findings)
269+
batch_findings.clear()
270270
finding_ids_batch = list(batch_finding_ids)
271271
batch_finding_ids.clear()
272272
logger.debug("process_findings: dispatching batch with push_to_jira=%s (batch_size=%d, is_final=%s)",

dojo/importers/default_reimporter.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,6 @@ def process_scan(
137137
reactivated_findings=reactivated_findings,
138138
untouched_findings=untouched_findings,
139139
)
140-
# Apply tags to findings and endpoints
141-
self.apply_import_tags(
142-
new_findings=new_findings,
143-
closed_findings=closed_findings,
144-
reactivated_findings=reactivated_findings,
145-
untouched_findings=untouched_findings,
146-
)
147140
# Send out som notifications to the user
148141
logger.debug("REIMPORT_SCAN: Generating notifications")
149142
updated_count = (
@@ -173,7 +166,7 @@ def process_scan(
173166

174167
def get_reimport_match_candidates_for_batch(
175168
self,
176-
batch_findings: list[Finding],
169+
unsaved_findings_batch: list[Finding],
177170
) -> tuple[dict, dict, dict]:
178171
"""
179172
Fetch candidate matches for a batch of *unsaved* findings during reimport.
@@ -195,23 +188,23 @@ def get_reimport_match_candidates_for_batch(
195188
if self.deduplication_algorithm == "hash_code":
196189
candidates_by_hash = find_candidates_for_deduplication_hash(
197190
self.test,
198-
batch_findings,
191+
unsaved_findings_batch,
199192
mode="reimport",
200193
)
201194
elif self.deduplication_algorithm == "unique_id_from_tool":
202195
candidates_by_uid = find_candidates_for_deduplication_unique_id(
203196
self.test,
204-
batch_findings,
197+
unsaved_findings_batch,
205198
mode="reimport",
206199
)
207200
elif self.deduplication_algorithm == "unique_id_from_tool_or_hash_code":
208201
candidates_by_uid, candidates_by_hash = find_candidates_for_deduplication_uid_or_hash(
209202
self.test,
210-
batch_findings,
203+
unsaved_findings_batch,
211204
mode="reimport",
212205
)
213206
elif self.deduplication_algorithm == "legacy":
214-
candidates_by_key = find_candidates_for_reimport_legacy(self.test, batch_findings)
207+
candidates_by_key = find_candidates_for_reimport_legacy(self.test, unsaved_findings_batch)
215208

216209
return candidates_by_hash, candidates_by_uid, candidates_by_key
217210

@@ -308,6 +301,7 @@ def process_findings(
308301
cleaned_findings.append(sanitized)
309302

310303
batch_finding_ids: list[int] = []
304+
batch_findings: list[Finding] = []
311305
findings_with_parser_tags: list[tuple] = []
312306
# Batch size for deduplication/post-processing (only new findings)
313307
dedupe_batch_max_size = getattr(settings, "IMPORT_REIMPORT_DEDUPE_BATCH_SIZE", 1000)
@@ -318,13 +312,13 @@ def process_findings(
318312
# This avoids the 1+N query problem by fetching all candidates for a batch at once
319313
for batch_start in range(0, len(cleaned_findings), match_batch_max_size):
320314
batch_end = min(batch_start + match_batch_max_size, len(cleaned_findings))
321-
batch_findings = cleaned_findings[batch_start:batch_end]
315+
unsaved_findings_batch = cleaned_findings[batch_start:batch_end]
322316
is_final_batch = batch_end == len(cleaned_findings)
323317

324318
logger.debug(f"Processing reimport batch {batch_start}-{batch_end} of {len(cleaned_findings)} findings")
325319

326320
# Prepare findings in batch: set test, service, calculate hash codes
327-
for unsaved_finding in batch_findings:
321+
for unsaved_finding in unsaved_findings_batch:
328322
# Some parsers provide "mitigated" field but do not set timezone (because they are probably not available in the report)
329323
# Finding.mitigated is DateTimeField and it requires timezone
330324
if unsaved_finding.mitigated and not unsaved_finding.mitigated.tzinfo:
@@ -342,12 +336,12 @@ def process_findings(
342336

343337
# Fetch all candidates for this batch at once (batch candidate finding)
344338
candidates_by_hash, candidates_by_uid, candidates_by_key = self.get_reimport_match_candidates_for_batch(
345-
batch_findings,
339+
unsaved_findings_batch,
346340
)
347341

348342
# Process each finding in the batch using pre-fetched candidates
349-
for idx, unsaved_finding in enumerate(batch_findings):
350-
is_final = is_final_batch and idx == len(batch_findings) - 1
343+
for idx, unsaved_finding in enumerate(unsaved_findings_batch):
344+
is_final = is_final_batch and idx == len(unsaved_findings_batch) - 1
351345

352346
# Match any findings to this new one coming in using pre-fetched candidates
353347
matched_findings = self.match_finding_to_candidate_reimport(
@@ -403,6 +397,7 @@ def process_findings(
403397
# all data is already saved on the finding, we only need to trigger post processing in batches
404398
push_to_jira = self.push_to_jira and ((not self.findings_groups_enabled or not self.group_by) or not finding_will_be_grouped)
405399
batch_finding_ids.append(finding.id)
400+
batch_findings.append(finding)
406401

407402
# Post-processing batches (deduplication, rules, etc.) are separate from matching batches.
408403
# These batches only contain "new" findings that were saved (not matched to existing findings).
@@ -425,6 +420,9 @@ def process_findings(
425420
# so rules/deduplication tasks see the tags already on the findings.
426421
bulk_apply_parser_tags(findings_with_parser_tags)
427422
findings_with_parser_tags.clear()
423+
# Apply import-time tags before post-processing so rules/deduplication see them.
424+
self.apply_import_tags_for_batch(batch_findings)
425+
batch_findings.clear()
428426
finding_ids_batch = list(batch_finding_ids)
429427
batch_finding_ids.clear()
430428
dojo_dispatch_task(

unittests/test_importers_performance.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,9 @@ def test_import_reimport_reimport_performance_pghistory_async(self):
343343
configure_pghistory_triggers()
344344

345345
self._import_reimport_performance(
346-
expected_num_queries1=172,
346+
expected_num_queries1=171,
347347
expected_num_async_tasks1=2,
348-
expected_num_queries2=125,
348+
expected_num_queries2=124,
349349
expected_num_async_tasks2=1,
350350
expected_num_queries3=29,
351351
expected_num_async_tasks3=1,
@@ -367,9 +367,9 @@ def test_import_reimport_reimport_performance_pghistory_no_async(self):
367367
testuser.usercontactinfo.save()
368368

369369
self._import_reimport_performance(
370-
expected_num_queries1=188,
370+
expected_num_queries1=187,
371371
expected_num_async_tasks1=2,
372-
expected_num_queries2=133,
372+
expected_num_queries2=132,
373373
expected_num_async_tasks2=1,
374374
expected_num_queries3=37,
375375
expected_num_async_tasks3=1,
@@ -392,9 +392,9 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr
392392
self.system_settings(enable_product_grade=True)
393393

394394
self._import_reimport_performance(
395-
expected_num_queries1=198,
395+
expected_num_queries1=197,
396396
expected_num_async_tasks1=4,
397-
expected_num_queries2=143,
397+
expected_num_queries2=142,
398398
expected_num_async_tasks2=3,
399399
expected_num_queries3=44,
400400
expected_num_async_tasks3=3,
@@ -633,9 +633,9 @@ def test_import_reimport_reimport_performance_pghistory_async(self):
633633
configure_pghistory_triggers()
634634

635635
self._import_reimport_performance(
636-
expected_num_queries1=179,
636+
expected_num_queries1=178,
637637
expected_num_async_tasks1=2,
638-
expected_num_queries2=134,
638+
expected_num_queries2=133,
639639
expected_num_async_tasks2=1,
640640
expected_num_queries3=37,
641641
expected_num_async_tasks3=1,
@@ -657,9 +657,9 @@ def test_import_reimport_reimport_performance_pghistory_no_async(self):
657657
testuser.usercontactinfo.save()
658658

659659
self._import_reimport_performance(
660-
expected_num_queries1=197,
660+
expected_num_queries1=196,
661661
expected_num_async_tasks1=2,
662-
expected_num_queries2=144,
662+
expected_num_queries2=143,
663663
expected_num_async_tasks2=1,
664664
expected_num_queries3=47,
665665
expected_num_async_tasks3=1,
@@ -682,9 +682,9 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr
682682
self.system_settings(enable_product_grade=True)
683683

684684
self._import_reimport_performance(
685-
expected_num_queries1=210,
685+
expected_num_queries1=209,
686686
expected_num_async_tasks1=4,
687-
expected_num_queries2=157,
687+
expected_num_queries2=156,
688688
expected_num_async_tasks2=3,
689689
expected_num_queries3=54,
690690
expected_num_async_tasks3=3,

0 commit comments

Comments
 (0)