Skip to content

Commit d03a4a4

Browse files
ldrozdz93claude
andcommitted
perf(plugin): bulk ObjectChange writes via thread-local + bulk_create
NetBox's core.signals.handle_changed_object fires ObjectChange.save() once per post_save / m2m_changed event. For a multi-thousand-entity batch the per-instance INSERT becomes the dominant write cost. Install a module-level monkey-patch on ObjectChange.save that, when a thread-local flag is active, buffers the instance instead of writing it. The deferred_changelog() context manager yields a _Deferred accumulator with commit_pending() / rollback_pending() / flush() methods: * On success of a changeset, the caller calls commit_pending(): the changeset's pending audit-log rows promote into the batch buffer. * On failure (ChangeSetException -> savepoint rollback), the caller calls rollback_pending(): the failed changeset's audit rows are discarded so they cannot leak past the rollback. * At batch end, flush() drains the batch buffer with one ObjectChange.objects.bulk_create(buffer, batch_size=500). m2m merge dedup is preserved by a per-window in-memory index keyed on (content_type_id, object_id, request_id): a follow-up m2m_changed fire collapses onto the prior buffered row instead of producing a duplicate audit entry. ApplyChangeSetView and BulkApplyView both wrap their work with deferred_changelog(), so the batch's audit-log rows commit together with the batch's data writes (per-batch atomicity to readers). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent ca6d54a commit d03a4a4

3 files changed

Lines changed: 345 additions & 17 deletions

File tree

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
#!/usr/bin/env python
2+
# Copyright 2026 NetBox Labs, Inc.
3+
"""
4+
Deferred ObjectChange writes for diode apply paths.
5+
6+
PR 3 of BULK-ORM Sprint 1, with the V3 atomicity fix layered on top.
7+
8+
NetBox's ``core.signals.handle_changed_object`` fires per-instance on
9+
``post_save`` / ``m2m_changed`` and persists each ``ObjectChange`` row
10+
with its own INSERT. For a multi-thousand-entity changeset that's the
11+
dominant write cost.
12+
13+
This module installs a module-level monkey-patch on
14+
``core.models.ObjectChange.save`` that, when a thread-local flag is
15+
active, buffers the instance instead of writing it. The
16+
``deferred_changelog()`` context manager activates the flag and yields
17+
a ``_Deferred`` accumulator with three methods:
18+
19+
* ``commit_pending()`` — promote the per-changeset *pending* buffer
20+
into the cross-changeset *batch* buffer. Called after a changeset's
21+
inner ``transaction.atomic()`` block completes successfully.
22+
* ``rollback_pending()`` — discard the per-changeset *pending* buffer.
23+
Called when a changeset's inner atomic block raises and its writes
24+
are rolled back. Without this the buffered ``ObjectChange`` rows
25+
would still get flushed, leaking audit-log entries pointing at
26+
rolled-back data.
27+
* ``flush()`` — issue ``ObjectChange.objects.bulk_create()`` for the
28+
accumulated batch buffer. Caller invokes this once at the end of
29+
the loop, **inside** the outer ``transaction.atomic()`` so a flush
30+
failure rolls back the whole batch.
31+
32+
m2m merge dedup is preserved by ``(content_type_id, object_id,
33+
request_id)`` keys spanning both buffers: a follow-up m2m_changed
34+
fire collapses onto whichever buffer holds the prior row, yielding
35+
a single audit row per object regardless of when within the batch
36+
the dedup happens.
37+
38+
Nested usage is a no-op (outer wins).
39+
"""
40+
41+
from __future__ import annotations
42+
43+
import logging
44+
import threading
45+
from contextlib import contextmanager
46+
47+
from core.models import ObjectChange
48+
49+
logger = logging.getLogger(__name__)
50+
51+
_state = threading.local()
52+
_BATCH_SIZE = 500
53+
54+
55+
def _is_active() -> bool:
56+
return getattr(_state, "active", False)
57+
58+
59+
def _get_deferred():
60+
return getattr(_state, "deferred", None)
61+
62+
63+
class _Deferred:
64+
"""Per-window accumulator for buffered ``ObjectChange`` writes."""
65+
66+
def __init__(self) -> None:
67+
self.pending_buffer: list[ObjectChange] = []
68+
self.pending_index: dict[tuple, int] = {}
69+
self.batch_buffer: list[ObjectChange] = []
70+
self.batch_index: dict[tuple, int] = {}
71+
72+
def _add(self, obj: ObjectChange) -> None:
73+
"""Buffer ``obj`` into pending; merge onto pending or batch on dedup."""
74+
key = (
75+
obj.changed_object_type_id,
76+
obj.changed_object_id,
77+
str(obj.request_id) if obj.request_id is not None else None,
78+
)
79+
pos = self.pending_index.get(key)
80+
if pos is not None:
81+
self.pending_buffer[pos].postchange_data = obj.postchange_data
82+
return
83+
batch_pos = self.batch_index.get(key)
84+
if batch_pos is not None:
85+
self.batch_buffer[batch_pos].postchange_data = obj.postchange_data
86+
return
87+
self.pending_index[key] = len(self.pending_buffer)
88+
self.pending_buffer.append(obj)
89+
90+
def commit_pending(self) -> None:
91+
"""Promote pending entries onto the batch buffer; clear pending."""
92+
for key, pos in self.pending_index.items():
93+
obj = self.pending_buffer[pos]
94+
existing = self.batch_index.get(key)
95+
if existing is not None:
96+
self.batch_buffer[existing].postchange_data = obj.postchange_data
97+
else:
98+
self.batch_index[key] = len(self.batch_buffer)
99+
self.batch_buffer.append(obj)
100+
self.pending_buffer = []
101+
self.pending_index = {}
102+
103+
def rollback_pending(self) -> None:
104+
"""Discard the pending buffer (e.g. after a savepoint rollback)."""
105+
self.pending_buffer = []
106+
self.pending_index = {}
107+
108+
def flush(self) -> None:
109+
"""Drain pending into batch, then bulk-insert and clear the batch buffer.
110+
111+
Calling ``flush()`` without first calling ``commit_pending()`` is the
112+
legacy single-window pattern (``with deferred_changelog(): ...`` with
113+
no per-changeset accumulator). It commits any leftover pending entries
114+
before issuing the INSERT so callers that don't use the V3 lifecycle
115+
API still get a single bulk_create.
116+
"""
117+
if self.pending_buffer:
118+
self.commit_pending()
119+
if self.batch_buffer:
120+
ObjectChange.objects.bulk_create(self.batch_buffer, batch_size=_BATCH_SIZE)
121+
self.batch_buffer = []
122+
self.batch_index = {}
123+
124+
125+
_original_save = ObjectChange.save
126+
127+
128+
def _patched_save(self, *args, **kwargs):
129+
"""ObjectChange.save replacement: buffers when deferred_changelog() is active."""
130+
if not _is_active():
131+
return _original_save(self, *args, **kwargs)
132+
133+
deferred = _get_deferred()
134+
if deferred is None:
135+
return _original_save(self, *args, **kwargs)
136+
137+
if self.pk is not None:
138+
# Already in DB (e.g. m2m post_clear updating an externally-saved
139+
# change row). Fall back to the real save for correctness.
140+
return _original_save(self, *args, **kwargs)
141+
142+
if not self.user_name and getattr(self, "user_id", None):
143+
self.user_name = self.user.username
144+
if not self.object_repr:
145+
try:
146+
self.object_repr = str(self.changed_object) if self.changed_object_id else ""
147+
except Exception:
148+
self.object_repr = ""
149+
150+
deferred._add(self)
151+
return None
152+
153+
154+
# Install the monkey-patch at import time; views.py imports this module.
155+
ObjectChange.save = _patched_save
156+
157+
158+
@contextmanager
159+
def deferred_changelog():
160+
"""
161+
Activate deferred ``ObjectChange`` persistence; yield a ``_Deferred``.
162+
163+
Two usage modes are supported:
164+
165+
* Legacy (single-window) — ``with deferred_changelog(): ...``. All saves
166+
go to the pending buffer; on clean context exit the manager auto-calls
167+
``flush()``, draining pending into batch and issuing one
168+
``bulk_create``. If the body raises, no flush happens, so an unhandled
169+
exception cannot leak audit-log rows.
170+
171+
* V3 (per-changeset accumulator) — ``with deferred_changelog() as defc:``
172+
with explicit ``defc.commit_pending()`` / ``defc.rollback_pending()``
173+
after each savepoint and ``defc.flush()`` once at the end inside the
174+
outer ``transaction.atomic()``. The auto-flush at clean context exit
175+
is a no-op because ``flush()`` already cleared the buffers.
176+
177+
Nested invocation is a no-op (outer wins).
178+
"""
179+
if _is_active():
180+
yield _get_deferred()
181+
return
182+
183+
deferred = _Deferred()
184+
_state.active = True
185+
_state.deferred = deferred
186+
raised = False
187+
try:
188+
yield deferred
189+
except Exception:
190+
raised = True
191+
raise
192+
finally:
193+
_state.active = False
194+
_state.deferred = None
195+
if not raised:
196+
deferred.flush()

netbox_diode_plugin/api/views.py

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
ChangeSetException,
1818
ChangeSetResult,
1919
)
20+
from .deferred_changelog import deferred_changelog
2021
from .differ import enter_prechange_cache, exit_prechange_cache, generate_changeset
2122
from .matcher import enter_request_obj_cache, exit_request_obj_cache
2223
from .permissions import (
@@ -317,7 +318,13 @@ def post(self, request, *args, **kwargs):
317318

318319
def _post(self, request, *args, **kwargs):
319320
change_set = ChangeSet.from_dict(request.data)
320-
result = _apply_one_changeset(change_set, request)
321+
with deferred_changelog() as defc:
322+
result = _apply_one_changeset(change_set, request)
323+
if result.errors:
324+
defc.rollback_pending()
325+
else:
326+
defc.commit_pending()
327+
defc.flush()
321328
return Response(result.to_dict(), status=result.get_status_code())
322329

323330

@@ -358,23 +365,31 @@ def _post(self, request, *args, **kwargs):
358365
raise ValidationError({"change_sets": ["change_sets must not be empty"]})
359366

360367
results = []
361-
for entry in change_sets:
362-
if not isinstance(entry, dict):
363-
results.append(
364-
ChangeSetResult(
365-
errors={"request": {"change_set": ["change_set must be an object"]}}
368+
with deferred_changelog() as defc:
369+
for entry in change_sets:
370+
if not isinstance(entry, dict):
371+
results.append(
372+
ChangeSetResult(
373+
errors={"request": {"change_set": ["change_set must be an object"]}}
374+
).to_dict()
375+
)
376+
continue
377+
try:
378+
change_set = ChangeSet.from_dict(entry)
379+
cs_result = _apply_one_changeset(change_set, request)
380+
if cs_result.errors:
381+
defc.rollback_pending()
382+
else:
383+
defc.commit_pending()
384+
result = cs_result.to_dict()
385+
except Exception as e:
386+
logger.error(f"Error parsing batch entry: {e}")
387+
defc.rollback_pending()
388+
result = ChangeSetResult(
389+
errors={"request": {"change_set": [f"invalid change_set: {e}"]}}
366390
).to_dict()
367-
)
368-
continue
369-
try:
370-
change_set = ChangeSet.from_dict(entry)
371-
result = _apply_one_changeset(change_set, request).to_dict()
372-
except Exception as e:
373-
logger.error(f"Error parsing batch entry: {e}")
374-
result = ChangeSetResult(
375-
errors={"request": {"change_set": [f"invalid change_set: {e}"]}}
376-
).to_dict()
377-
results.append(result)
391+
results.append(result)
392+
defc.flush()
378393

379394
http_status = (
380395
status.HTTP_207_MULTI_STATUS
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/env python
2+
# Copyright 2026 NetBox Labs, Inc.
3+
"""Diode NetBox Plugin - PR 3 deferred changelog tests."""
4+
5+
import uuid
6+
7+
from core.models import ObjectChange
8+
from dcim.models import Site
9+
from django.contrib.contenttypes.models import ContentType
10+
from django.db import connection
11+
from django.test import RequestFactory, TestCase
12+
from django.test.utils import CaptureQueriesContext
13+
from netbox.context import current_request
14+
15+
from netbox_diode_plugin.api.deferred_changelog import deferred_changelog
16+
from netbox_diode_plugin.plugin_config import get_diode_user
17+
18+
19+
def _make_request():
20+
"""Build a minimal request with a fresh request_id, attached to current_request."""
21+
rf = RequestFactory()
22+
req = rf.post("/x")
23+
req.id = uuid.uuid4()
24+
req.user = get_diode_user()
25+
return req
26+
27+
28+
class DeferredChangelogTestCase(TestCase):
29+
"""Verifies deferred_changelog (PR 3)."""
30+
31+
def test_n_changes_one_bulk_insert(self):
32+
"""N saves inside the context produce ONE bulk INSERT, not N."""
33+
req = _make_request()
34+
token = current_request.set(req)
35+
try:
36+
ObjectChange.objects.all().delete()
37+
with CaptureQueriesContext(connection) as ctx, deferred_changelog():
38+
for i in range(5):
39+
Site.objects.create(name=f"S-{i}", slug=f"s-{i}")
40+
41+
insert_stmts = [
42+
q for q in ctx.captured_queries
43+
if "INSERT INTO" in q["sql"] and '"core_objectchange"' in q["sql"]
44+
]
45+
assert len(insert_stmts) == 1, [q["sql"] for q in insert_stmts]
46+
assert ObjectChange.objects.count() == 5
47+
finally:
48+
current_request.reset(token)
49+
50+
def test_audit_row_content_matches_baseline(self):
51+
"""Buffered audit rows carry the same fields as the per-save baseline."""
52+
req = _make_request()
53+
token = current_request.set(req)
54+
try:
55+
ObjectChange.objects.all().delete()
56+
with deferred_changelog():
57+
Site.objects.create(name="Deferred S", slug="deferred-s")
58+
deferred_row = ObjectChange.objects.get(object_repr="Deferred S")
59+
60+
ObjectChange.objects.all().delete()
61+
req2 = _make_request()
62+
token2 = current_request.set(req2)
63+
try:
64+
Site.objects.create(name="Baseline S", slug="baseline-s")
65+
finally:
66+
current_request.reset(token2)
67+
baseline_row = ObjectChange.objects.get(object_repr="Baseline S")
68+
69+
for field in ("action", "user_name", "changed_object_type_id",
70+
"postchange_data"):
71+
assert getattr(deferred_row, field) == getattr(baseline_row, field), field
72+
finally:
73+
current_request.reset(token)
74+
75+
def test_nested_context_is_noop(self):
76+
"""An inner deferred_changelog must not flush — outer wins."""
77+
req = _make_request()
78+
token = current_request.set(req)
79+
try:
80+
ObjectChange.objects.all().delete()
81+
with deferred_changelog():
82+
with deferred_changelog():
83+
Site.objects.create(name="N-1", slug="n-1")
84+
# If inner had flushed, this row would already exist.
85+
assert ObjectChange.objects.filter(object_repr="N-1").count() == 0
86+
Site.objects.create(name="N-2", slug="n-2")
87+
88+
assert ObjectChange.objects.filter(object_repr__in=["N-1", "N-2"]).count() == 2
89+
finally:
90+
current_request.reset(token)
91+
92+
def test_m2m_merge_dedup_single_row(self):
93+
"""An UPDATE that re-fires via m2m_changed merges into one buffered row."""
94+
from extras.models import Tag
95+
96+
tag = Tag.objects.create(name="net", slug="net")
97+
site = Site.objects.create(name="MM", slug="mm") # outside the window
98+
99+
req = _make_request()
100+
token = current_request.set(req)
101+
try:
102+
ObjectChange.objects.all().delete()
103+
with deferred_changelog():
104+
site.description = "updated"
105+
site.save() # post_save → ObjectChange #1
106+
site.tags.set([tag]) # m2m_changed → would normally update the prior row
107+
108+
ct = ContentType.objects.get_for_model(Site)
109+
rows = list(ObjectChange.objects.filter(
110+
changed_object_type=ct,
111+
changed_object_id=site.pk,
112+
request_id=req.id,
113+
))
114+
assert len(rows) == 1, [r.postchange_data for r in rows]
115+
assert rows[0].postchange_data.get("tags") == [tag.id]
116+
finally:
117+
current_request.reset(token)

0 commit comments

Comments
 (0)