|
1 | 1 | """ |
2 | | -Tag inheritance — central coordination module. |
3 | | -
|
4 | | -Provides a thread-local ``batch()`` context manager that suppresses |
5 | | -per-instance inheritance work driven by ``m2m_changed`` and ``post_save`` |
6 | | -signals. While inside a batch, the signal handlers in |
7 | | -``dojo/tags_signals.py`` early-return; the calling code is responsible for |
8 | | -applying inheritance in bulk (e.g. via the importer's existing |
9 | | -``_bulk_inherit_tags`` path or ``propagate_tags_on_product_sync``). |
10 | | -
|
11 | | -This replaces the previous pattern of ``signals.m2m_changed.disconnect(...)`` |
12 | | -in importer hot loops, which was process-global and unsafe under threaded |
13 | | -gunicorn / Celery thread pools / ASGI threadpools (see PR description for |
14 | | -the full rationale). |
| 2 | +Tag inheritance — watson-style context manager. |
| 3 | +
|
| 4 | +Pattern mirrors `watson.search.SearchContextManager`: signal handlers |
| 5 | +register touched instances into the active context instead of running |
| 6 | +per-row inheritance work; the context flushes them in bulk on |
| 7 | +``flush()`` (called explicitly mid-batch) and on context exit. |
| 8 | +
|
| 9 | +Usage: |
| 10 | + with tag_inheritance.batch() as ctx: |
| 11 | + # bulk operations create/modify many instances |
| 12 | + ... |
| 13 | + ctx.flush() # optional, mid-batch sync (e.g. before JIRA push) |
| 14 | + ... |
| 15 | + # auto-flushes on outermost exit |
| 16 | +
|
| 17 | +The context lives in ``threading.local``, so concurrent threads (and |
| 18 | +Celery workers in non-prefork pools) are unaffected by other threads' |
| 19 | +batches. |
15 | 20 | """ |
16 | 21 | from __future__ import annotations |
17 | 22 |
|
18 | | -import contextlib |
| 23 | +import logging |
19 | 24 | import threading |
| 25 | +from collections import defaultdict |
20 | 26 | from contextlib import contextmanager |
21 | 27 |
|
| 28 | +logger = logging.getLogger(__name__) |
| 29 | + |
22 | 30 | _state = threading.local() |
23 | 31 |
|
24 | 32 |
|
| 33 | +class TagInheritanceContext: |
| 34 | + |
| 35 | + """ |
| 36 | + Per-thread registrar for instances whose inherited tags need |
| 37 | + re-syncing in bulk. Touched instances are grouped by model class; |
| 38 | + ``flush()`` runs one bulk diff per (model, product) group via the |
| 39 | + existing ``_sync_inheritance_for_qs`` helper. |
| 40 | + """ |
| 41 | + |
| 42 | + def __init__(self): |
| 43 | + self._depth = 0 |
| 44 | + # model_class -> set of instance pks |
| 45 | + self._touched: dict[type, set[int]] = defaultdict(set) |
| 46 | + # System-wide inheritance flag is read from the DB and cached for |
| 47 | + # the lifetime of the context. Per-product flags are read directly |
| 48 | + # off the in-memory product instance. |
| 49 | + self._system_inheritance: bool | None = None |
| 50 | + |
| 51 | + def is_active(self) -> bool: |
| 52 | + return self._depth > 0 |
| 53 | + |
| 54 | + def system_inheritance_enabled(self) -> bool: |
| 55 | + if self._system_inheritance is None: |
| 56 | + from dojo.utils import get_system_setting # noqa: PLC0415 |
| 57 | + self._system_inheritance = bool(get_system_setting("enable_product_tag_inheritance")) |
| 58 | + return self._system_inheritance |
| 59 | + |
| 60 | + def is_inheritance_enabled_for(self, instance) -> bool: |
| 61 | + """ |
| 62 | + True when the given instance is under at least one product whose |
| 63 | + inheritance is enabled (per-product flag or system-wide). |
| 64 | + """ |
| 65 | + from dojo.tags_signals import get_products # noqa: PLC0415 |
| 66 | + products = get_products(instance) |
| 67 | + if any(getattr(p, "enable_product_tag_inheritance", False) for p in products if p): |
| 68 | + return True |
| 69 | + return self.system_inheritance_enabled() |
| 70 | + |
| 71 | + def add(self, instance) -> None: |
| 72 | + """ |
| 73 | + Register an instance for bulk-sync at next flush. No-op when |
| 74 | + inheritance is disabled for this instance, so the bulk path stays |
| 75 | + cheap on inheritance-off products. |
| 76 | + """ |
| 77 | + if instance is None or getattr(instance, "pk", None) is None: |
| 78 | + return |
| 79 | + if not self.is_inheritance_enabled_for(instance): |
| 80 | + return |
| 81 | + self._touched[type(instance)].add(instance.pk) |
| 82 | + |
| 83 | + def flush(self) -> None: |
| 84 | + """ |
| 85 | + Bulk-sync inherited tags for every registered instance, then |
| 86 | + clear the registry. Idempotent and cheap when nothing was |
| 87 | + touched. |
| 88 | + """ |
| 89 | + if not self._touched: |
| 90 | + return |
| 91 | + # Local imports to avoid circulars at module import time. |
| 92 | + from dojo.location.models import Location # noqa: PLC0415 |
| 93 | + from dojo.product.helpers import ( # noqa: PLC0415 |
| 94 | + _build_location_target_names_map, |
| 95 | + _sync_inheritance_for_qs, |
| 96 | + ) |
| 97 | + from dojo.tags_signals import get_products # noqa: PLC0415 |
| 98 | + |
| 99 | + touched, self._touched = self._touched, defaultdict(set) |
| 100 | + |
| 101 | + for model_class, pks in touched.items(): |
| 102 | + if not pks: |
| 103 | + continue |
| 104 | + queryset = model_class.objects.filter(pk__in=pks) |
| 105 | + if model_class is Location: |
| 106 | + # Location target = union of related products' tags. Use |
| 107 | + # the bulk precompute helper. |
| 108 | + target_map = _build_location_target_names_map(list(pks)) |
| 109 | + _sync_inheritance_for_qs( |
| 110 | + queryset, |
| 111 | + target_names_per_child=lambda loc, _m=target_map: _m.get(loc.pk, set()), |
| 112 | + ) |
| 113 | + else: |
| 114 | + # All other children belong to one product (Finding via |
| 115 | + # test, Endpoint via product, etc.). Group by product so |
| 116 | + # each group gets one target name set. |
| 117 | + instances = list(queryset) |
| 118 | + by_product: dict[int, list] = defaultdict(list) |
| 119 | + product_by_id: dict[int, object] = {} |
| 120 | + for inst in instances: |
| 121 | + products = get_products(inst) |
| 122 | + for product in products: |
| 123 | + if product is None: |
| 124 | + continue |
| 125 | + by_product[product.id].append(inst) |
| 126 | + product_by_id[product.id] = product |
| 127 | + for product_id, group in by_product.items(): |
| 128 | + product = product_by_id[product_id] |
| 129 | + target_names = {tag.name for tag in product.tags.all()} |
| 130 | + _sync_inheritance_for_qs( |
| 131 | + group, |
| 132 | + target_names_per_child=lambda _c, _t=target_names: _t, |
| 133 | + ) |
| 134 | + |
| 135 | + |
| 136 | +def current() -> TagInheritanceContext | None: |
| 137 | + """Return the active context for this thread, if any.""" |
| 138 | + return getattr(_state, "ctx", None) |
| 139 | + |
| 140 | + |
25 | 141 | def is_in_batch() -> bool: |
26 | 142 | """Return True when the current thread is inside an active ``batch()``.""" |
27 | | - return bool(getattr(_state, "depth", 0)) |
| 143 | + ctx = current() |
| 144 | + return ctx is not None and ctx.is_active() |
28 | 145 |
|
29 | 146 |
|
30 | 147 | @contextmanager |
31 | 148 | def batch(): |
32 | 149 | """ |
33 | | - Suppress per-instance inheritance signals for the calling thread. |
34 | | -
|
35 | | - Usage: |
36 | | - with tag_inheritance.batch(): |
37 | | - # Bulk operations that would otherwise fire `make_inherited_tags_sticky` |
38 | | - # or `inherit_tags_on_instance` per row. |
39 | | - ... |
40 | | - # After exit, callers should explicitly bulk-apply inheritance via |
41 | | - # `propagate_tags_on_product_sync(product)` (or equivalent) for any |
42 | | - # product whose children were created/modified inside the batch. |
43 | | -
|
44 | | - The context is reentrant; nested ``with`` blocks share the suppression |
45 | | - until the outermost block exits. State lives in ``threading.local()``, |
46 | | - so concurrent threads (and Celery workers in non-prefork pools) are |
47 | | - unaffected by other threads' batches. |
48 | | - """ |
49 | | - _state.depth = getattr(_state, "depth", 0) + 1 |
50 | | - try: |
51 | | - yield |
52 | | - finally: |
53 | | - _state.depth -= 1 |
54 | | - if _state.depth <= 0: |
55 | | - # Clean up the attribute so leak-free thread reuse stays simple. |
56 | | - with contextlib.suppress(AttributeError): |
57 | | - del _state.depth |
| 150 | + Open a tag-inheritance context for the calling thread. |
58 | 151 |
|
| 152 | + Inside the context, signal handlers register touched instances |
| 153 | + instead of running per-row inheritance. On exit, the context |
| 154 | + auto-flushes (bulk-applies inheritance for every touched instance). |
59 | 155 |
|
60 | | -def flush_for_product(product) -> None: |
| 156 | + Reentrant: nested ``with`` blocks share the context until the |
| 157 | + outermost block exits. |
61 | 158 | """ |
62 | | - Bulk-apply tag inheritance to all children of `product`. |
63 | | -
|
64 | | - Intended to be called after a ``batch()`` block exits, when the calling |
65 | | - code has created/modified many children of one product and the |
66 | | - per-instance signal handlers were suppressed. Delegates to |
67 | | - ``propagate_tags_on_product_sync``, which uses the Phase A bulk-diff |
68 | | - helpers to sync `inherited_tags` and `tags` across all children in O(1) |
69 | | - queries per (model x tag) instead of O(N) rows. |
70 | | -
|
71 | | - Short-circuits when tag inheritance is disabled (neither the system-wide |
72 | | - flag nor the per-product flag is set) so callers (e.g. importers) can |
73 | | - invoke this unconditionally without paying for it. |
74 | | - """ |
75 | | - # Local imports to avoid circulars at module import time. |
76 | | - from dojo.product.helpers import propagate_tags_on_product_sync # noqa: PLC0415 |
77 | | - from dojo.utils import get_system_setting # noqa: PLC0415 |
78 | | - if not getattr(product, "enable_product_tag_inheritance", False) and not get_system_setting("enable_product_tag_inheritance"): |
79 | | - return |
80 | | - propagate_tags_on_product_sync(product) |
| 159 | + ctx = getattr(_state, "ctx", None) |
| 160 | + owner = ctx is None |
| 161 | + if owner: |
| 162 | + ctx = TagInheritanceContext() |
| 163 | + _state.ctx = ctx |
| 164 | + ctx._depth += 1 |
| 165 | + try: |
| 166 | + yield ctx |
| 167 | + finally: |
| 168 | + ctx._depth -= 1 |
| 169 | + if ctx._depth <= 0: |
| 170 | + try: |
| 171 | + ctx.flush() |
| 172 | + finally: |
| 173 | + if owner: |
| 174 | + del _state.ctx |
0 commit comments