Skip to content

Commit f69b0f2

Browse files
perf(watson): prefetch relations + force async indexing (#14881)
* perf(watson): prefetch relations + force async indexing Watson's SearchAdapter resolves __-separated relation paths via per-instance getattr, triggering an N+1 query storm during async indexing. For Finding (test__engagement__product__name + jira_issue__jira_key) and Vulnerability_Id (finding__test__engagement__product__name) on a 1000-row batch this adds thousands of extra queries per task. dojo/utils_watson_prefetch.py auto-derives select_related / prefetch_related paths from each adapter's fields/store by walking model._meta, then applies them in update_watson_search_index_for_model. Toggle: DD_WATSON_INDEX_PREFETCH_ENABLED (default True). On any error we log loudly and fall back to the plain queryset so indexing still completes. Also adds force_async=True to dojo_dispatch_task / we_want_async — keeps the watson indexer in the background even when the caller is a block_execution=True user, since index updates are slow and never need to be synchronous from the user's perspective. Tests: - unittests/test_watson_index_prefetch.py (10 tests) — path classification for Product/Finding/Vulnerability_Id/Endpoint, unknown-path drop, setting toggle, derivation-raise fallback with log assertion. - unittests/test_celery_dispatch_force_async.py (4 tests) — force_async precedence over sync=True and block_execution. * test(watson): fix CI failures from watson prefetch + force_async - test_tag_inheritance_perf: update V2/V3 import baselines (-52 each) to reflect adapter-derived select_related/prefetch_related in the async watson indexer running inline under CELERY_TASK_ALWAYS_EAGER. - test_watson_async_search_index: add CELERY_TASK_ALWAYS_EAGER=True to the threshold=0 case. force_async=True now always dispatches via apply_async; without eager mode the task never runs and the index stays empty. * perf(watson): intermediate flush + always-async index dispatch Wrap watson.search_context_manager.add_to_context with a size-based hook that drains the per-request context to async celery tasks as soon as it reaches WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE, instead of waiting for end-of-request. Bounds in-memory growth on long-running imports and lets celery workers start indexing batches earlier (parallel fanout). Hook installed once in dojo.apps.ready(). BATCH_SIZE doubles as threshold; set to 0/negative to disable the intermediate flush. Drop WATSON_ASYNC_INDEX_UPDATE_THRESHOLD: index dispatch is now unconditionally async. Removes the sub-threshold sync branch (which blocked the request on _bulk_save_search_entries) and the disable-async path. Consolidate _extract_tasks_for_async + _trigger_async_index_update + _dispatch_async_index_batches + _flush_search_context_intermediate into one helper `_drain_search_context_to_async` that groups, dispatches, and discards entries from the set in place. With the set drained, watson's end() bulk-saves an empty iterator — no explicit invalidate() needed. Tests: - test_watson_intermediate_flush: new — drain dispatches + clears, threshold-triggered hook, threshold=0 disables, invalid context skips. - test_watson_async_search_index: collapse three threshold-variant tests into one, class-level CELERY_TASK_ALWAYS_EAGER=True. - test_tag_inheritance_perf: reimport no-change baselines V2 69→74, V3 87→92 (always-async path adds 5 queries vs prior sub-threshold sync branch). * upgrade notes * test(watson): query-count assertion for prefetch helper Lock in the N+1 elimination claim directly with CaptureQueriesContext — previously only observed indirectly via the ZAP import perf test. * test(watson): supply Product.description in flush hook fixtures CI runs the V3_FEATURE_LOCATIONS=True matrix where BaseModel.save calls full_clean — Product.description is blank=False, so the bare fixture ValidationErrors out. Local default (V3 off) skips validation, masking this in the prior run.
1 parent 758be64 commit f69b0f2

13 files changed

Lines changed: 616 additions & 88 deletions

docs/content/releases/os_upgrading/2.59.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,9 @@ As announced in DefectDojo 2.57.0, the Stub Findings feature has been removed. T
8888

8989
Any requests to this endpoint will now return a 404 Not Found error. The Stub Findings UI is no longer available.
9090

91+
## Configuration change in Watson Search Indexing
92+
93+
In [PR 14881](https://github.com/DefectDojo/django-DefectDojo/pull/14881)We optimized the way the Django Watson search index is updated during imports and reimports. There is not a single configuration setting to manage the threshold: `DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE`. The default value should work fine for most instances.
94+
95+
9196
For more information, check the [Release Notes](https://github.com/DefectDojo/django-DefectDojo/releases/tag/2.59.0).

dojo/apps.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ def ready(self):
106106
register_django_pghistory_models()
107107
configure_audit_system()
108108

109+
from dojo.middleware import install_intermediate_flush_hook # noqa: PLC0415
110+
install_intermediate_flush_hook()
111+
109112

110113
def get_model_fields_with_extra(model, extra_fields=()):
111114
return get_model_fields(get_model_default_fields(model), extra_fields)

dojo/celery_dispatch.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur
6262
- Inject `async_user_id` if missing.
6363
- Capture and inject pghistory context if available.
6464
- Respect `force_sync=True` (foreground execution) and user `block_execution`.
65+
- Respect `force_async=True` (background execution even when the caller
66+
would otherwise run synchronously, e.g. user has `block_execution`).
67+
`force_async` wins over `force_sync` and `block_execution`.
6568
- Support `countdown=<seconds>` for async dispatch.
6669
6770
Returns:

dojo/decorators.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ def get_tasks(self):
5858
def we_want_async(*args, func=None, **kwargs):
5959
from dojo.utils import get_current_user # noqa: PLC0415 circular import
6060

61+
force_async = kwargs.get("force_async", False)
62+
if force_async:
63+
logger.debug("dojo_async_task %s: running task in the background as force_async=True has been found as kwarg", func)
64+
return True
65+
6166
force_sync = kwargs.get("force_sync", False)
6267
if force_sync:
6368
logger.debug("dojo_async_task %s: running task in the foreground as force_sync=True has been found as kwarg", func)

dojo/middleware.py

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -243,69 +243,83 @@ class AsyncSearchContextMiddleware(SearchContextMiddleware):
243243
"""
244244

245245
def _close_search_context(self, request):
246-
"""Override watson's close behavior to trigger async updates when above threshold."""
246+
"""Override watson's close behavior to always dispatch index updates asynchronously."""
247247
if search_context_manager.is_active():
248-
from django.conf import settings # noqa: PLC0415 circular import
249-
250-
# Extract tasks and check if we should trigger async update
251-
captured_tasks = self._extract_tasks_for_async()
252-
253-
# Get total number of instances across all model types
254-
total_instances = sum(len(pk_list) for pk_list in captured_tasks.values())
255-
threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_THRESHOLD", 100)
256-
257-
# only needed when at least one model instance is updated
258-
if total_instances > 0:
259-
# If threshold is below 0, async updating is disabled
260-
if threshold < 0:
261-
logger.debug(f"AsyncSearchContextMiddleware: Async updating disabled (threshold={threshold}), using synchronous update")
262-
elif total_instances > threshold:
263-
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances > {threshold} threshold, triggering async update")
264-
self._trigger_async_index_update(captured_tasks)
265-
# Invalidate to prevent synchronous index update by super()._close_search_context()
266-
search_context_manager.invalidate()
267-
else:
268-
logger.debug(f"AsyncSearchContextMiddleware: {total_instances} instances <= {threshold} threshold, using synchronous update")
269-
# Let watson handle synchronous update for small numbers
248+
objects, _is_invalid = search_context_manager._stack[-1]
249+
_drain_search_context_to_async(objects, source="AsyncSearchContextMiddleware")
270250

251+
# The set is now empty (or was already empty); watson's `end()` will
252+
# bulk-save an empty iterator and short-circuit. No need to invalidate.
271253
super()._close_search_context(request)
272254

273-
def _extract_tasks_for_async(self):
274-
"""Extract tasks from the search context and group by model type for async processing."""
275-
current_tasks, _is_invalid = search_context_manager._stack[-1]
276-
277-
# Group by model type for efficient batch processing
278-
model_groups = {}
279-
for _engine, obj in current_tasks:
280-
model_key = f"{obj._meta.app_label}.{obj._meta.model_name}"
281-
if model_key not in model_groups:
282-
model_groups[model_key] = []
283-
model_groups[model_key].append(obj.pk)
284255

285-
# Log what we extracted per model type
286-
for model_key, pk_list in model_groups.items():
287-
logger.debug(f"AsyncSearchContextMiddleware: Extracted {len(pk_list)} {model_key} instances for async indexing")
256+
def _drain_search_context_to_async(objects, source):
257+
"""
258+
Group `objects` ({(engine, obj), ...}) by model, dispatch one
259+
force_async celery task per WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE-sized
260+
batch, and `set.discard()` the drained entries from `objects` in place.
261+
262+
`objects` is the `set` inside `search_context_manager._stack[-1][0]`.
263+
Mutating it in place is safe because watson's `_stack` is `threading.local`
264+
and callers (request close + the wrapped `add_to_context`) hold the
265+
active reference.
266+
"""
267+
if not objects:
268+
return
269+
270+
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
271+
from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import
272+
273+
# Snapshot before grouping so we don't iterate while mutating.
274+
snapshot = list(objects)
275+
model_groups = {}
276+
for _engine, obj in snapshot:
277+
model_key = f"{obj._meta.app_label}.{obj._meta.model_name}"
278+
model_groups.setdefault(model_key, []).append(obj.pk)
279+
280+
batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
281+
for model_name, pk_list in model_groups.items():
282+
batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)]
283+
# force_async=True keeps indexing off the request path even for users
284+
# with block_execution=True — index updates are slow and never need
285+
# to be synchronous from the user's perspective.
286+
for i, batch in enumerate(batches, 1):
287+
logger.debug(f"{source}: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances")
288+
dojo_dispatch_task(update_watson_search_index_for_model, model_name, batch, force_async=True)
289+
290+
for entry in snapshot:
291+
objects.discard(entry)
292+
293+
294+
def install_intermediate_flush_hook():
295+
"""
296+
Wrap `watson.search.search_context_manager.add_to_context` with a
297+
size-based flush. Once the per-request set reaches
298+
`WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE`, drain it into async tasks
299+
and clear it in place. Bounds memory on long-running requests
300+
(large imports) and starts celery batches earlier instead of
301+
dispatching all at end-of-request.
302+
303+
Idempotent — safe to call multiple times.
304+
Setting WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE to 0 or below disables
305+
the hook at runtime.
306+
"""
307+
cls = search_context_manager.__class__
308+
if getattr(cls, "_dd_intermediate_flush_installed", False):
309+
return
288310

289-
return model_groups
311+
original_add = cls.add_to_context
290312

291-
def _trigger_async_index_update(self, model_groups):
292-
"""Trigger async tasks to update search indexes, chunking large lists into batches of settings.WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE."""
293-
if not model_groups:
313+
def add_to_context_with_flush(self, engine, obj):
314+
original_add(self, engine, obj)
315+
threshold = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
316+
if threshold <= 0 or not self._stack:
294317
return
318+
objects, is_invalid = self._stack[-1]
319+
if is_invalid or len(objects) < threshold:
320+
return
321+
_drain_search_context_to_async(objects, source="AsyncSearchContextMiddleware[intermediate]")
295322

296-
# Import here to avoid circular import
297-
from django.conf import settings # noqa: PLC0415 circular import
298-
299-
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
300-
from dojo.tasks import update_watson_search_index_for_model # noqa: PLC0415 circular import
301-
302-
# Create tasks per model type, chunking large lists into configurable batches
303-
for model_name, pk_list in model_groups.items():
304-
# Chunk into batches using configurable batch size (compatible with Python 3.11)
305-
batch_size = getattr(settings, "WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE", 1000)
306-
batches = [pk_list[i:i + batch_size] for i in range(0, len(pk_list), batch_size)]
307-
308-
# Create tasks for each batch and log each one
309-
for i, batch in enumerate(batches, 1):
310-
logger.debug(f"AsyncSearchContextMiddleware: Triggering batch {i}/{len(batches)} for {model_name}: {len(batch)} instances")
311-
dojo_dispatch_task(update_watson_search_index_for_model, model_name, batch)
323+
cls.add_to_context = add_to_context_with_flush
324+
cls._dd_intermediate_flush_installed = True
325+
logger.debug("AsyncSearchContextMiddleware: intermediate flush hook installed on %s", cls.__name__)

dojo/settings/settings.dist.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,9 +115,17 @@
115115
DD_TAG_BULK_ADD_BATCH_SIZE=(int, 1000),
116116
# Tagulous slug truncate unique setting. Set to -1 to use tagulous internal default (5)
117117
DD_TAGULOUS_SLUG_TRUNCATE_UNIQUE=(int, -1),
118-
# Minimum number of model updated instances before search index updates as performaed asynchronously. Set to -1 to disable async updates.
119-
DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD=(int, 10),
118+
# Batch size for async watson search-index update tasks. Also doubles as
119+
# the per-request intermediate-flush threshold: once the in-memory watson
120+
# context reaches this many pending objects mid-request,
121+
# AsyncSearchContextMiddleware flushes them to async celery tasks instead
122+
# of waiting for end-of-request. Set to 0 (or negative) to disable the
123+
# intermediate flush.
120124
DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE=(int, 1000),
125+
# When True, the async watson indexer auto-derives select_related/prefetch_related
126+
# paths from each adapter's `fields`/`store` to avoid N+1 queries during indexing.
127+
# Falls back to a plain queryset on any error (logged).
128+
DD_WATSON_INDEX_PREFETCH_ENABLED=(bool, True),
121129
DD_FOOTER_VERSION=(str, ""),
122130
# models should be passed to celery by ID, default is False (for now)
123131
DD_DATABASE_ENGINE=(str, "django.db.backends.postgresql"),
@@ -869,8 +877,8 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
869877
CELERY_IMPORTS = ("dojo.tools.tool_issue_updater", )
870878

871879
# Watson async index update settings
872-
WATSON_ASYNC_INDEX_UPDATE_THRESHOLD = env("DD_WATSON_ASYNC_INDEX_UPDATE_THRESHOLD")
873880
WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE = env("DD_WATSON_ASYNC_INDEX_UPDATE_BATCH_SIZE")
881+
WATSON_INDEX_PREFETCH_ENABLED = env("DD_WATSON_INDEX_PREFETCH_ENABLED")
874882

875883
# Celery beat scheduled tasks
876884
CELERY_BEAT_SCHEDULE = {

dojo/tasks.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs):
182182
"""
183183
from watson.search import SearchContextManager, default_search_engine # noqa: PLC0415 circular import
184184

185+
from dojo.utils_watson_prefetch import build_indexing_queryset # noqa: PLC0415 circular import
186+
185187
logger.debug(f"Starting async watson index update for {len(pk_list)} {model_name} instances")
186188

187189
try:
@@ -194,8 +196,11 @@ def update_watson_search_index_for_model(model_name, pk_list, *args, **kwargs):
194196
app_label, model_name = model_name.split(".")
195197
model_class = apps.get_model(app_label, model_name)
196198

197-
# Bulk load instances and add them to the context
198-
instances = model_class.objects.filter(pk__in=pk_list)
199+
# Bulk load instances and add them to the context. The queryset auto-derives
200+
# select_related/prefetch_related from the adapter's fields/store paths to
201+
# avoid N+1 queries during indexing. Disable via DD_WATSON_INDEX_PREFETCH_ENABLED=False.
202+
adapter = engine.get_adapter(model_class)
203+
instances = build_indexing_queryset(model_class, pk_list, adapter)
199204
instances_added = 0
200205
instances_skipped = 0
201206

dojo/utils_watson_prefetch.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
"""
2+
Query prefetch helper for the async watson search indexer.
3+
4+
Watson's `SearchAdapter._resolve_field` walks `__`-separated relation paths via
5+
per-instance `getattr`, which triggers one query per FK hop per object during
6+
indexing. For deep adapter `fields`/`store` paths (e.g.
7+
`finding__test__engagement__product__name`) on a 1000-row batch this means
8+
thousands of extra queries.
9+
10+
`build_prefetched_queryset` introspects the adapter paths against the model's
11+
`_meta`, classifies each prefix as FK chain (`select_related`) or M2M / reverse
12+
(`prefetch_related`), and applies them in a single query plan. On any failure
13+
the caller is expected to fall back to the plain queryset — watson still works
14+
correctly, just slower.
15+
16+
Toggle: ``settings.WATSON_INDEX_PREFETCH_ENABLED`` (default True).
17+
"""
18+
19+
import logging
20+
21+
from django.core.exceptions import FieldDoesNotExist
22+
23+
logger = logging.getLogger(__name__)
24+
25+
26+
def _classify_path(model, prefix):
27+
"""
28+
Walk a `__`-separated relation prefix against `model._meta`.
29+
30+
Returns
31+
-------
32+
"select" | "prefetch" | None
33+
- "select": pure FK / OneToOne chain (safe for select_related).
34+
- "prefetch": chain contains a many-to-many or reverse-many leg.
35+
- None: unresolvable (callable on adapter, GenericForeignKey, typo, etc.) —
36+
caller should drop this path.
37+
38+
"""
39+
is_multi = False
40+
current = model
41+
for part in prefix.split("__"):
42+
try:
43+
field = current._meta.get_field(part)
44+
except FieldDoesNotExist:
45+
return None
46+
if getattr(field, "many_to_many", False) or getattr(field, "one_to_many", False):
47+
is_multi = True
48+
related = getattr(field, "related_model", None)
49+
if related is None:
50+
# Reached a concrete field (e.g. CharField) — chain ends here. The
51+
# caller passes the prefix without the leaf, so this should be rare.
52+
return "prefetch" if is_multi else "select"
53+
current = related
54+
return "prefetch" if is_multi else "select"
55+
56+
57+
def derive_relation_paths(model, adapter):
58+
"""
59+
Inspect adapter `fields` + `store` and return ``(select_paths, prefetch_paths)``.
60+
61+
Each entry is a relation prefix suitable for passing to
62+
`QuerySet.select_related` / `QuerySet.prefetch_related`. Paths that cannot
63+
be resolved against ``model._meta`` are dropped (watson will resolve them
64+
at indexing time the slow way).
65+
"""
66+
select_paths = set()
67+
prefetch_paths = set()
68+
69+
raw_paths = tuple(getattr(adapter, "fields", ()) or ()) + tuple(getattr(adapter, "store", ()) or ())
70+
for path in raw_paths:
71+
if "__" not in path:
72+
continue
73+
prefix = path.rsplit("__", 1)[0]
74+
classification = _classify_path(model, prefix)
75+
if classification == "select":
76+
select_paths.add(prefix)
77+
elif classification == "prefetch":
78+
prefetch_paths.add(prefix)
79+
# None: drop silently — adapter property/GFK, watson handles at runtime.
80+
81+
return select_paths, prefetch_paths
82+
83+
84+
def build_indexing_queryset(model, pk_list, adapter):
85+
"""
86+
Build the queryset used by the async watson indexer.
87+
88+
Applies `select_related` / `prefetch_related` derived from the adapter when
89+
``settings.WATSON_INDEX_PREFETCH_ENABLED`` is True (default). On any error
90+
we log loudly and return the plain queryset so indexing still succeeds.
91+
"""
92+
from django.conf import settings # noqa: PLC0415 -- settings access at call time
93+
94+
base_qs = model.objects.filter(pk__in=pk_list)
95+
96+
if not getattr(settings, "WATSON_INDEX_PREFETCH_ENABLED", True):
97+
logger.debug(
98+
"WATSON_INDEX_PREFETCH_ENABLED=False, indexing %s with plain queryset",
99+
model.__name__,
100+
)
101+
return base_qs
102+
103+
try:
104+
select_paths, prefetch_paths = derive_relation_paths(model, adapter)
105+
except Exception:
106+
logger.exception(
107+
"Watson prefetch path derivation failed for %s — falling back to plain queryset",
108+
model.__name__,
109+
)
110+
return base_qs
111+
112+
if not select_paths and not prefetch_paths:
113+
return base_qs
114+
115+
try:
116+
qs = base_qs
117+
if select_paths:
118+
qs = qs.select_related(*select_paths)
119+
if prefetch_paths:
120+
qs = qs.prefetch_related(*prefetch_paths)
121+
logger.debug(
122+
"Watson indexing %s with select_related=%s prefetch_related=%s",
123+
model.__name__, sorted(select_paths), sorted(prefetch_paths),
124+
)
125+
except Exception:
126+
logger.exception(
127+
"Watson prefetch application failed for %s (select=%s prefetch=%s) — "
128+
"falling back to plain queryset",
129+
model.__name__, sorted(select_paths), sorted(prefetch_paths),
130+
)
131+
return base_qs
132+
else:
133+
return qs

0 commit comments

Comments
 (0)