Skip to content

Commit fae4c3d

Browse files
ldrozdz93claude
andcommitted
perf(plugin): bulk TaggedItem writes + suppress m2m_changed re-fire
NetBox's instance.tags.set([...]) triggers m2m_changed per tagged instance. The signal handler then (a) writes another ObjectChange row and (b) re-runs serialize_for_event(instance) for the events_queue. For changesets with thousands of tagged objects this dominates apply cost on top of the per-object signal overhead. Add bulk_tags.apply_tags_bulk(instance_tag_pairs, request) which: - Bypasses the through-table m2m signal by bulk-writing extras_taggeditem rows directly via bulk_create(ignore_conflicts= True, batch_size=500). _diode_preload['tag_ids_by_slug'] (from the preload step) is reused; any missing slugs are resolved via one fallback query. - Mirrors instance.tags.set([...]) replace-set semantics by deleting existing TaggedItem rows for the (content_type, object_id) targets in one combined DELETE before the bulk INSERT. - Re-emits a single enqueue_event(OBJECT_UPDATED) per tagged instance so the events_queue payload reflects the post-tag state, without the duplicate ObjectChange row. applier._apply_change pops 'tags' from 'data' before serializer.save() runs and buffers (instance, tag_input) on request._diode_tag_pairs; apply_changeset flushes via apply_tags_bulk after its main loop. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d03a4a4 commit fae4c3d

3 files changed

Lines changed: 307 additions & 1 deletion

File tree

netbox_diode_plugin/api/applier.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from extras.models import Tag
1313
from rest_framework.exceptions import ValidationError as ValidationError
1414

15+
from .bulk_tags import apply_tags_bulk, supports_tags
1516
from .common import NON_FIELD_ERRORS, Change, ChangeSet, ChangeSetException, ChangeSetResult, ChangeType, error_from_validation_error
1617
from .matcher import find_existing_object, invalidate_find_obj_entry
1718
from .plugin_utils import get_object_type_model, legal_fields
@@ -27,6 +28,11 @@ def apply_changeset(change_set: ChangeSet, request) -> ChangeSetResult:
2728
_validate_change_set(change_set)
2829
_preload_changeset_cache(change_set, request)
2930

31+
# Collect (instance, tag_input) pairs as we apply changes; flushed via
32+
# apply_tags_bulk after the main loop so we issue one DELETE+bulk INSERT
33+
# for the whole changeset instead of per-instance m2m_changed re-fires.
34+
request._diode_tag_pairs = []
35+
3036
created = {}
3137
for change in change_set.changes:
3238
change_type = change.change_type
@@ -55,6 +61,10 @@ def apply_changeset(change_set: ChangeSet, request) -> ChangeSetResult:
5561
logger.error(f"Integrity error {object_type}: {e} {data}")
5662
raise _err(f"created a conflict with an existing {object_type}", object_type, "__all__")
5763

64+
# Flush deferred tag writes in one bulk pass.
65+
apply_tags_bulk(request._diode_tag_pairs, request)
66+
request._diode_tag_pairs = []
67+
5868
return ChangeSetResult(
5969
id=change_set.id,
6070
)
@@ -163,14 +173,23 @@ def _create_or_find_instance(data: dict, object_type: str, serializer_class, req
163173

164174

165175
def _apply_change(data: dict, model_class: models.Model, change: Change, created: dict, request):
176+
# Pull tags out of `data` BEFORE serializer.save() so the serializer's
177+
# `tag.set([...])` side effect — which fires m2m_changed and triggers a
178+
# duplicate ObjectChange + a re-run of serialize_for_event — does not run.
179+
# We buffer (instance, tag_input) and flush via apply_tags_bulk after the
180+
# main apply_changeset loop.
181+
deferred_tags = None
182+
if supports_tags(model_class) and isinstance(data.get("tags"), list):
183+
deferred_tags = data.pop("tags")
184+
166185
serializer_class = get_serializer_for_model(model_class)
167186
change_type = change.change_type
187+
instance = None
168188

169189
if change_type == ChangeType.CREATE:
170190
# For component types that may be auto-created from e.g. DeviceType or ModuleType templates,
171191
# try to find existing object first before attempting to create.
172192
# This prevents duplicates when components are instantiated during Device/Module save()
173-
instance = None
174193
if _is_auto_created_component(change.object_type):
175194
instance = _try_find_and_update_existing_instance(data, change.object_type, serializer_class, request)
176195

@@ -195,6 +214,10 @@ def _apply_change(data: dict, model_class: models.Model, change: Change, created
195214
serializer.save()
196215
invalidate_find_obj_entry(change.object_type, instance.id)
197216

217+
if deferred_tags is not None and instance is not None:
218+
request._diode_tag_pairs.append((instance, deferred_tags))
219+
220+
198221
def _set_path(data, path, value):
199222
path = path.split(".")
200223
key = path.pop(0)
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
#!/usr/bin/env python
2+
# Copyright 2026 NetBox Labs, Inc.
3+
"""
4+
Bulk TaggedItem writes for diode apply paths.
5+
6+
PR 4 of BULK-ORM Sprint 1.
7+
8+
NetBox's `instance.tags.set([...])` triggers `m2m_changed` (post_clear +
9+
post_add) per instance. The signal handler re-fires
10+
`handle_changed_object`, which (a) writes another ObjectChange UPDATE and
11+
(b) calls `enqueue_event` again — that re-runs the expensive
12+
`serialize_for_event(instance)`. Multiplied by every tagged object in a
13+
changeset this dominates the apply path.
14+
15+
This module bypasses the through-table m2m signal by writing
16+
`extras_taggeditem` rows directly via `bulk_create` and re-emitting a
17+
single `enqueue_event` per instance to refresh the queued payload with
18+
the post-tag state. PR 3's `deferred_changelog` already collapses the
19+
ObjectChange side; this module collapses the TaggedItem side.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import logging
25+
from collections import defaultdict
26+
from typing import Any
27+
28+
from core.events import OBJECT_UPDATED
29+
from django.contrib.contenttypes.models import ContentType
30+
from django.db.models import Q
31+
from extras.events import enqueue_event
32+
from extras.models import Tag, TaggedItem
33+
from netbox.context import events_queue
34+
from netbox.models.features import TagsMixin
35+
36+
logger = logging.getLogger(__name__)
37+
38+
39+
def supports_tags(model_or_instance) -> bool:
40+
"""Return True if the model (or instance's class) inherits from TagsMixin."""
41+
cls = model_or_instance if isinstance(model_or_instance, type) else type(model_or_instance)
42+
return issubclass(cls, TagsMixin)
43+
44+
45+
def apply_tags_bulk(instance_tag_pairs: list[tuple[Any, list]], request) -> None:
46+
"""
47+
Write tags for a list of (instance, tag_input) pairs via bulk_create.
48+
49+
`tag_input` may be a list of slug strings, a list of integer tag IDs, or
50+
a list of dicts with a "slug" key (the same shapes the serializer accepts).
51+
Tags listed for an instance fully replace its existing tag set, mirroring
52+
`instance.tags.set([...])` semantics — but without the m2m_changed re-fire.
53+
"""
54+
if not instance_tag_pairs:
55+
return
56+
57+
preload = getattr(request, "_diode_preload", None) or {}
58+
tag_ids_by_slug: dict[str, int] = preload.get("tag_ids_by_slug", {}) or {}
59+
_backfill_missing_tag_ids(instance_tag_pairs, tag_ids_by_slug)
60+
61+
rows, target_keys, instances_to_event = _build_rows(
62+
instance_tag_pairs, tag_ids_by_slug,
63+
)
64+
65+
_replace_existing(target_keys)
66+
if rows:
67+
TaggedItem.objects.bulk_create(rows, ignore_conflicts=True, batch_size=500)
68+
_re_emit_events(instances_to_event, request)
69+
70+
71+
def _backfill_missing_tag_ids(pairs, tag_ids_by_slug: dict[str, int]) -> None:
72+
missing: set[str] = set()
73+
for _instance, tag_input in pairs:
74+
for raw in tag_input or []:
75+
slug = _slug_from(raw)
76+
if slug and slug not in tag_ids_by_slug:
77+
missing.add(slug)
78+
if missing:
79+
for tag_id, slug in Tag.objects.filter(slug__in=missing).values_list("id", "slug"):
80+
tag_ids_by_slug[slug] = tag_id
81+
82+
83+
def _build_rows(pairs, tag_ids_by_slug: dict[str, int]):
84+
ct_by_model: dict[type, ContentType] = {}
85+
rows: list[TaggedItem] = []
86+
target_keys: dict[int, set[int]] = defaultdict(set)
87+
instances_to_event: list[Any] = []
88+
89+
for instance, tag_input in pairs:
90+
if instance is None or instance.pk is None:
91+
continue
92+
ct = _ct_for(instance, ct_by_model)
93+
target_keys[ct.id].add(instance.pk)
94+
instances_to_event.append(instance)
95+
for raw in tag_input or []:
96+
tag_id = _resolve_tag_id(raw, tag_ids_by_slug)
97+
if tag_id is None:
98+
logger.warning("apply_tags_bulk: could not resolve tag %r for %s", raw, instance)
99+
continue
100+
rows.append(TaggedItem(content_type_id=ct.id, object_id=instance.pk, tag_id=tag_id))
101+
102+
return rows, target_keys, instances_to_event
103+
104+
105+
def _ct_for(instance, ct_by_model: dict[type, ContentType]) -> ContentType:
106+
model = type(instance)
107+
ct = ct_by_model.get(model)
108+
if ct is None:
109+
ct = ContentType.objects.get_for_model(model)
110+
ct_by_model[model] = ct
111+
return ct
112+
113+
114+
def _replace_existing(target_keys: dict[int, set[int]]) -> None:
115+
if not target_keys:
116+
return
117+
q = Q()
118+
for ct_id, obj_ids in target_keys.items():
119+
q |= Q(content_type_id=ct_id, object_id__in=obj_ids)
120+
TaggedItem.objects.filter(q).delete()
121+
122+
123+
def _re_emit_events(instances, request) -> None:
124+
if not instances:
125+
return
126+
queue = events_queue.get()
127+
for instance in instances:
128+
try:
129+
enqueue_event(queue, instance, request, OBJECT_UPDATED)
130+
except Exception as e:
131+
logger.warning("apply_tags_bulk: enqueue_event failed for %s: %s", instance, e)
132+
events_queue.set(queue)
133+
134+
135+
def _slug_from(raw) -> str | None:
136+
if isinstance(raw, str):
137+
return raw
138+
if isinstance(raw, dict):
139+
s = raw.get("slug")
140+
return s if isinstance(s, str) else None
141+
return None
142+
143+
144+
def _resolve_tag_id(raw, tag_ids_by_slug: dict[str, int]) -> int | None:
145+
if isinstance(raw, int):
146+
return raw
147+
slug = _slug_from(raw)
148+
if slug is not None:
149+
return tag_ids_by_slug.get(slug)
150+
return None
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#!/usr/bin/env python
2+
# Copyright 2026 NetBox Labs, Inc.
3+
"""Diode NetBox Plugin - PR 4 bulk TaggedItem tests."""
4+
5+
import uuid
6+
from types import SimpleNamespace
7+
8+
from core.models import ObjectChange
9+
from dcim.models import Site
10+
from django.contrib.contenttypes.models import ContentType
11+
from django.db import connection
12+
from django.test import RequestFactory, TestCase
13+
from django.test.utils import CaptureQueriesContext
14+
from extras.models import Tag, TaggedItem
15+
from netbox.context import current_request, events_queue
16+
17+
from netbox_diode_plugin.api.bulk_tags import apply_tags_bulk
18+
from netbox_diode_plugin.api.deferred_changelog import deferred_changelog
19+
from netbox_diode_plugin.plugin_config import get_diode_user
20+
21+
22+
def _make_request():
23+
rf = RequestFactory()
24+
req = rf.post("/x")
25+
req.id = uuid.uuid4()
26+
req.user = get_diode_user()
27+
return req
28+
29+
30+
class BulkTaggedItemTestCase(TestCase):
31+
"""Verifies apply_tags_bulk (PR 4)."""
32+
33+
def setUp(self):
34+
"""Create K distinct tags + N untagged sites."""
35+
self.tags = [Tag.objects.create(name=f"t{i}", slug=f"t{i}") for i in range(3)]
36+
self.sites = [Site.objects.create(name=f"S{i}", slug=f"s{i}") for i in range(4)]
37+
38+
def test_n_objects_one_bulk_insert(self):
39+
"""N tagged objects produce 1 INSERT INTO extras_taggeditem (modulo batches)."""
40+
req = _make_request()
41+
# Mimic preload contract from PR 2.
42+
req._diode_preload = {
43+
"tag_ids_by_slug": {t.slug: t.id for t in self.tags},
44+
}
45+
token = current_request.set(req)
46+
try:
47+
pairs = [(s, [t.slug for t in self.tags]) for s in self.sites]
48+
with CaptureQueriesContext(connection) as ctx:
49+
apply_tags_bulk(pairs, req)
50+
inserts = [
51+
q for q in ctx.captured_queries
52+
if "INSERT INTO" in q["sql"] and '"extras_taggeditem"' in q["sql"]
53+
]
54+
assert len(inserts) == 1, [q["sql"] for q in inserts]
55+
finally:
56+
current_request.reset(token)
57+
58+
def test_tag_set_per_object_matches_baseline(self):
59+
"""Final tag set per instance equals what `instance.tags.set([...])` would produce."""
60+
req = _make_request()
61+
req._diode_preload = {
62+
"tag_ids_by_slug": {t.slug: t.id for t in self.tags},
63+
}
64+
token = current_request.set(req)
65+
try:
66+
pairs = [(self.sites[0], ["t0", "t1"]), (self.sites[1], ["t2"])]
67+
apply_tags_bulk(pairs, req)
68+
ct = ContentType.objects.get_for_model(Site)
69+
70+
for site, expected_slugs in [(self.sites[0], {"t0", "t1"}), (self.sites[1], {"t2"})]:
71+
got = set(
72+
Tag.objects.filter(
73+
id__in=TaggedItem.objects.filter(
74+
content_type=ct, object_id=site.pk
75+
).values_list("tag_id", flat=True)
76+
).values_list("slug", flat=True)
77+
)
78+
assert got == expected_slugs, (site, got, expected_slugs)
79+
finally:
80+
current_request.reset(token)
81+
82+
def test_no_extra_audit_row_per_tagged_object(self):
83+
"""N audit rows for N tagged-only updates — no doubled rows from m2m re-fire."""
84+
req = _make_request()
85+
req._diode_preload = {
86+
"tag_ids_by_slug": {t.slug: t.id for t in self.tags},
87+
}
88+
token = current_request.set(req)
89+
try:
90+
ObjectChange.objects.all().delete()
91+
with deferred_changelog():
92+
# Touch each instance once via a normal save, then bulk-tag.
93+
for s in self.sites:
94+
s.description = "touched"
95+
s.save()
96+
apply_tags_bulk([(s, ["t0"]) for s in self.sites], req)
97+
98+
ct = ContentType.objects.get_for_model(Site)
99+
row_count = ObjectChange.objects.filter(
100+
changed_object_type=ct,
101+
changed_object_id__in=[s.pk for s in self.sites],
102+
).count()
103+
assert row_count == len(self.sites), row_count
104+
finally:
105+
current_request.reset(token)
106+
107+
def test_events_queue_populated_with_post_tag_state(self):
108+
"""events_queue has one entry per tagged instance, with key matching baseline format."""
109+
req = _make_request()
110+
req._diode_preload = {
111+
"tag_ids_by_slug": {t.slug: t.id for t in self.tags},
112+
}
113+
token = current_request.set(req)
114+
# Reset events queue.
115+
q_token = events_queue.set({})
116+
try:
117+
apply_tags_bulk([(self.sites[0], ["t0"])], req)
118+
queued = events_queue.get()
119+
expected_key = f"dcim.site:{self.sites[0].pk}"
120+
assert expected_key in queued, list(queued.keys())
121+
finally:
122+
events_queue.reset(q_token)
123+
current_request.reset(token)
124+
125+
def test_empty_pairs_is_noop(self):
126+
"""No pairs -> no INSERT, no DELETE."""
127+
req = _make_request()
128+
req._diode_preload = {"tag_ids_by_slug": {}}
129+
with CaptureQueriesContext(connection) as ctx:
130+
apply_tags_bulk([], req)
131+
# An empty input may still touch a tx savepoint, but must not write taggeditem.
132+
for q in ctx.captured_queries:
133+
assert '"extras_taggeditem"' not in q["sql"], q["sql"]

0 commit comments

Comments
 (0)