diff --git a/netbox_diode_plugin/__init__.py b/netbox_diode_plugin/__init__.py index 4517851..1b0c0e2 100644 --- a/netbox_diode_plugin/__init__.py +++ b/netbox_diode_plugin/__init__.py @@ -78,10 +78,25 @@ class NetBoxDiodePluginConfig(PluginConfig): # fields (empty-QuerySet truthiness bug), so under load every # instance.clean()/save() re-queries extras_customfield. Cache # is invalidated on CustomField post_save/post_delete signals. + # + # apply_buffer_change_logging: keep the audit trail intact but + # cut the per-save change-logging cost. During apply, + # ObjectChange serialisation skips the per-m2m-relation SELECTs + # that dominate `to_objectchange` (one query per m2m field on + # every save), and the rows are collected in an in-memory + # buffer instead of being written one at a time. On successful + # commit the buffer is flushed as a single `bulk_create`, with + # all objects' m2m relations resolved in one query per relation, + # and `post_save` is re-emitted so receivers connected to + # `post_save(ObjectChange)` still fire. The flush runs inline at + # commit, so the audit log stays immediately consistent. + # Mutually exclusive in intent with `apply_bypass_change_logging` + # - if both are enabled, bypass wins (no rows produced at all). "apply_bypass_counter_updates": False, "apply_bypass_change_logging": False, "apply_bypass_search_indexing": False, "apply_bypass_customfield_query_cache": False, + "apply_buffer_change_logging": False, # Per-entity retry on Postgres deadlock (SQLSTATE 40P01) during # the apply phase of /bulk-plan-apply. Cross-batch concurrent diff --git a/netbox_diode_plugin/api/applier.py b/netbox_diode_plugin/api/applier.py index 64ac62c..c4be6f4 100644 --- a/netbox_diode_plugin/api/applier.py +++ b/netbox_diode_plugin/api/applier.py @@ -10,8 +10,9 @@ from django.db.utils import IntegrityError from rest_framework.exceptions import ValidationError as ValidationError +from .change_log_buffer import snapshot_for_apply from .common import NON_FIELD_ERRORS, Change, ChangeSet, ChangeSetException, ChangeSetResult, ChangeType, error_from_validation_error -from .matcher import find_existing_object, invalidate_find_obj_entry +from .matcher import find_existing_object, invalidate_find_obj_entry, requires_pre_save_match from .plugin_utils import get_object_type_model, legal_fields from .profile import profiled from .supported_models import get_serializer_for_model @@ -78,6 +79,7 @@ def _try_find_and_update_existing_instance(data: dict, object_type: str, seriali try: instance = find_existing_object(data, object_type) if instance: + snapshot_for_apply(instance) serializer = serializer_class(instance, data=data, partial=True, context={"request": request}) serializer.is_valid(raise_exception=True) result = serializer.save() @@ -110,8 +112,13 @@ def _apply_change(data: dict, model_class: models.Model, change: Change, created # For component types that may be auto-created from e.g. DeviceType or ModuleType templates, # try to find existing object first before attempting to create. # This prevents duplicates when components are instantiated during Device/Module save() + # The same find-first path also handles types whose logical match + # criteria are not enforced by a DB unique constraint (see + # matcher._REQUIRES_PRE_SAVE_MATCH): concurrent planners would + # otherwise each emit CREATE for the same logical row and both + # inserts would succeed without IntegrityError to fall back on. instance = None - if _is_auto_created_component(change.object_type): + if _is_auto_created_component(change.object_type) or requires_pre_save_match(change.object_type): instance = _try_find_and_update_existing_instance(data, change.object_type, serializer_class, request) if not instance: @@ -124,6 +131,7 @@ def _apply_change(data: dict, model_class: models.Model, change: Change, created elif change_type == ChangeType.UPDATE: if object_id := change.object_id: instance = model_class.objects.get(id=object_id) + snapshot_for_apply(instance) serializer = serializer_class(instance, data=data, partial=True, context={"request": request}) serializer.is_valid(raise_exception=True) serializer.save() diff --git a/netbox_diode_plugin/api/change_log_buffer.py b/netbox_diode_plugin/api/change_log_buffer.py new file mode 100644 index 0000000..178eaba --- /dev/null +++ b/netbox_diode_plugin/api/change_log_buffer.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python +# Copyright 2026 NetBox Labs, Inc. +""" +Buffer NetBox's ObjectChange writes during diode applies and flush them as one bulk_create. + +NetBox's ``handle_changed_object`` receiver (``core/signals.py``) runs +``instance.to_objectchange(action)`` on every saved model, and that +serialisation is expensive: Django's ``serializers.serialize('json', +[obj])`` issues one SELECT per many-to-many relation on the model (its +``handle_m2m_field`` walks each m2m manager). For a model like +``dcim.Interface`` with three m2m relations that is three DB +round-trips on *every* save, which dominates the apply request critical +path - measured at roughly 40% of total apply throughput on production +load. + +This module removes that cost while preserving the audit trail (and the +receivers connected to ``post_save(sender=ObjectChange)``) in three +pieces: + + 1. **Fast serialisation.** While a diode apply buffer is active, + ``ChangeLoggingMixin.serialize_object`` is routed through + ``_fast_serialize_object``, which restricts Django's serializer to + the model's local (non-m2m) fields via the ``fields=`` allowlist. + Django then skips ``handle_m2m_field`` entirely, so no per-relation + SELECT is issued. Tags are left as an empty placeholder rather than + queried per save. Both the omitted m2m fields and the tags are + re-added in bulk at flush (see piece 3). Outside the apply path the + original serialiser runs unchanged. + + 2. **In-memory buffer.** A request-scoped ``contextvars.ContextVar`` + holds a dict of ObjectChange instances keyed by + ``(content_type_id, changed_object_id)``. m2m_changed events for an + object already in the buffer merge their ``postchange_data`` in + memory, replacing the upstream SELECT-then-UPDATE pair with a dict + lookup. + + 3. **Batched flush with bulk enrichment.** The per-entity buffers + are collected into a request-level batch (via + ``request_change_logging_batch``). After the apply commits, the + batch is flushed: for each model, one query per m2m relation and + one query for tags resolve them for *all* buffered objects at once + and patch the values back into ``postchange_data``; then a single + ``bulk_create`` writes every row and ``post_save`` is re-emitted so + downstream receivers still fire. This turns O(saves x relations) + round-trips into O(models x relations) per request. + +Gating: the plugin setting ``apply_buffer_change_logging`` (default +``False``) controls whether the buffer activates. With it off the +buffered handler delegates straight through to upstream NetBox and the +fast serialiser is never engaged. + +Flush timing: the per-entity append and the request-level flush are both +registered via ``transaction.on_commit``. A rolled-back entity's append +callback never fires, so its rows never reach the batch; only changes +that genuinely committed are written. The flush runs after commit, so +the bulk m2m queries observe the final committed relation state. + +m2m ordering: enrichment records each relation as a sorted list of +related primary keys. This is deterministic but may differ in order +(not membership) from the queryset order Django's serializer would +produce on the synchronous path. + +Pre_delete is intentionally left untouched: NetBox's delete handler runs +protection-rule validation, which must keep firing, and deletes are +uncommon on the auto-apply path. +""" + +import json +import logging +from collections import defaultdict +from contextlib import contextmanager +from contextvars import ContextVar + +from core.choices import ObjectChangeActionChoices +from core.events import OBJECT_CREATED, OBJECT_UPDATED +from core.models import ObjectChange +from core.signals import handle_changed_object as _original_handler +from django.contrib.contenttypes.models import ContentType +from django.core import serializers as dj_serializers +from django.db import transaction +from django.db.models.signals import m2m_changed, post_save +from extras.events import enqueue_event +from extras.models import Tag +from extras.models.tags import TaggedItem +from extras.utils import is_taggable +from netbox.config import get_config +from netbox.context import current_request, events_queue +from netbox.models.features import ChangeLoggingMixin +from netbox.plugins import get_plugin_config + +from .change_log_bypass import _bypass_active, _guarded_handler +from .change_log_bypass import _enabled as _bypass_enabled + +logger = logging.getLogger(__name__) + +# When the bypass module is enabled it has already swapped NetBox's +# `handle_changed_object` for its own `_guarded_handler` at the +# receiver-registry level. Capture whichever function is currently the +# "real" handler so our wrapper can delegate to it on the no-buffer +# path. Order matters here: this module imports the bypass module +# above, which means the bypass module's import-time swap has already +# run by the time we read the reference. +_previous_handler = _guarded_handler if _bypass_enabled else _original_handler + +# Per-entity buffer. The value type is `dict[tuple[int, int], ObjectChange]` +# keyed by (content_type_id, changed_object_id). A value of None +# means the per-entity context manager is not active and the wrapper +# should delegate to upstream. +_apply_change_buffer: ContextVar = ContextVar( + "diode_apply_change_buffer", default=None +) + +# Request-scoped batch. The value type is `list[ObjectChange] | None`. +# When set (via `request_change_logging_batch`), each per-entity buffer +# appends its ObjectChange instances here instead of flushing on its +# own. The outer context manager flushes the whole batch as one +# bulk_create at request end, which is also where m2m enrichment can +# resolve every buffered object's relations in one query per relation. +_request_batch: ContextVar = ContextVar( + "diode_request_change_logging_batch", default=None +) + + +def _fast_serialize_object(obj, exclude=None): + """ + Serialise ``obj`` for change logging without issuing per-m2m SELECTs. + + Mirrors ``utilities.serialization.serialize_object`` but passes + ``fields=`` to Django's serializer restricted to the model's local + (non-m2m) fields. Django skips ``handle_m2m_field`` for any field + not in the allowlist, so the m2m round-trips are eliminated. The + omitted m2m fields are re-added in bulk by ``_enrich_m2m`` at flush. + + FK fields are matched by Django on ``attname[:-3]`` (the field name + without the ``_id`` suffix), so passing field *names* keeps every + scalar and FK field while dropping only the m2m relations. + + Tags get an empty placeholder rather than a per-save query; + ``_enrich_tags`` fills it in bulk at flush. The placeholder is only + added for taggable models, which is also how a row is later + recognised as needing tag enrichment. + """ + field_names = [f.name for f in obj._meta.local_fields] + json_str = dj_serializers.serialize("json", [obj], fields=field_names) + data = json.loads(json_str)[0]["fields"] + exclude = exclude or [] + + if "custom_field_data" in data: + data["custom_fields"] = data.pop("custom_field_data") + + # Resolving tags here would cost one query per save. Leave an empty + # placeholder for taggable models and let `_enrich_tags` fill it in + # bulk at flush. The placeholder's presence also marks the row as + # taggable: is_taggable needs the live instance we have here but not + # at flush time. (The `_tags` instance cache upstream consults is not + # populated on the diode apply path that this fast route serves.) + if is_taggable(obj): + data["tags"] = [] + + for key in list(data.keys()): + if key in exclude: + data.pop(key) + + return data + + +_original_serialize_object = ChangeLoggingMixin.serialize_object + + +def _serialize_object_gated(self, exclude=None): + """ + Route serialisation through the fast path only while a buffer is active. + + Installed over ``ChangeLoggingMixin.serialize_object`` at import. + When no diode apply buffer is active in the current context this + delegates to the original method, so all non-apply change logging + (the UI, the REST API, etc.) is byte-for-byte unchanged. + """ + if _apply_change_buffer.get() is not None: + return _fast_serialize_object(self, exclude=exclude or []) + return _original_serialize_object(self, exclude=exclude) + + +ChangeLoggingMixin.serialize_object = _serialize_object_gated + + +def snapshot_for_apply(instance): + """ + Capture a prechange snapshot for a diode-applied update. + + NetBox records prechange state by calling ``instance.snapshot()`` in its + view and DRF viewset layers (``get_object_with_snapshot`` and the bulk + update/destroy mixins). The diode apply path applies through a plain + APIView and a direct ``serializer.save()``, so it bypasses those and would + otherwise record no ``prechange_data`` for updates. Call this after + fetching the instance and before saving to bring diode-applied updates to + parity with how NetBox itself records every other update. + + The prechange must be format-consistent with the postchange this module + produces, or the changelog diff reports spurious changes: + + - Buffer inactive: defer to NetBox's ``snapshot()`` (full serialiser), + which matches the unbuffered postchange exactly. + - Buffer active: build the prechange the same way the buffered postchange + is built - scalar fields via the fast serialiser, m2m and tags resolved + now and sorted to match ``_enrich_m2m`` / ``_enrich_tags``. They must be + read here because the before-state is gone once the update commits, so + the flush-time enrichment cannot recover them. + """ + if not hasattr(instance, "serialize_object"): + return + + if _apply_change_buffer.get() is None: + instance.snapshot() + return + + exclude = ["last_updated"] if get_config().CHANGELOG_SKIP_EMPTY_CHANGES else [] + data = _fast_serialize_object(instance, exclude=exclude) + for field in instance._meta.local_many_to_many: + if field.serialize: + data[field.name] = sorted( + getattr(instance, field.name).values_list("pk", flat=True) + ) + if is_taggable(instance) and "tags" in data: + data["tags"] = sorted(instance.tags.values_list("name", flat=True)) + + instance._prechange_snapshot = data + + +def _classify_signal(instance, kwargs): + """ + Mirror of NetBox's event-type / m2m-flag detection. + + Returns ``(event_type, action, m2m_changed_flag)`` for the buffer + path to act on, or ``None`` if the signal does not produce an + ObjectChange. Kept in lockstep with + ``core/signals.py:handle_changed_object``. + """ + if kwargs.get("created"): + return OBJECT_CREATED, ObjectChangeActionChoices.ACTION_CREATE, False + if "created" in kwargs: + return OBJECT_UPDATED, ObjectChangeActionChoices.ACTION_UPDATE, False + if kwargs.get("action") in ("post_add", "post_remove") and kwargs.get("pk_set"): + return OBJECT_UPDATED, ObjectChangeActionChoices.ACTION_UPDATE, True + if kwargs.get("action") == "post_clear": + if kwargs.get("model") == Tag and getattr(instance, "_prechange_snapshot", {}).get("tags"): + return OBJECT_UPDATED, ObjectChangeActionChoices.ACTION_UPDATE, True + return None + + +def _buffered_handler(sender, instance, **kwargs): + """ + Receiver wrapper installed in place of NetBox's ``handle_changed_object``. + + When no buffer is active in the current context, delegates to the + previously-connected handler (NetBox's default, or the bypass + wrapper if that feature is enabled). When a buffer is active, + captures the would-be ObjectChange in the buffer rather than + saving it. + """ + buffer = _apply_change_buffer.get() + if buffer is None: + return _previous_handler(sender, instance, **kwargs) + + # If apply_bypass_change_logging is also active in this context, + # bypass wins: no ObjectChange row, no buffered entry, no event. + # This keeps the combined-settings semantics predictable - enabling + # both flags lands on "no change logging" rather than "buffered + # change logging". + if _bypass_active.get(): + return None + + if not hasattr(instance, "to_objectchange"): + return None + + request = current_request.get() + if request is None: + return None + + classified = _classify_signal(instance, kwargs) + if classified is None: + return None + event_type, action, m2m_changed_flag = classified + + objectchange = instance.to_objectchange(action) + key = (ContentType.objects.get_for_model(instance).id, instance.pk) + + if m2m_changed_flag and key in buffer: + # In-memory equivalent of the upstream "find prior ObjectChange + # by (content_type, object_id, request_id) and merge + # postchange_data". No DB round-trip. + buffer[key].postchange_data = objectchange.postchange_data + elif objectchange and objectchange.has_changes: + # Pre-populate the fields that ObjectChange.save() would + # otherwise set (user_name, object_repr). bulk_create skips + # the model's save() override, so these must be set on the + # instance before queuing it. + objectchange.user = request.user + objectchange.user_name = request.user.username if request.user else "" + objectchange.request_id = request.id + objectchange.object_repr = str(instance) + buffer[key] = objectchange + + if m2m_changed_flag: + # Match upstream behaviour: ensure subsequent reads see the + # fresh m2m assignments. + instance.refresh_from_db() + + # Append to the request-scoped events queue exactly as upstream + # does. NetBox flushes this queue at `request_finished`, which is + # how downstream signal consumers receive their payloads. Buffering + # the ObjectChange writes does not change this path. + queue = events_queue.get() + enqueue_event(queue, instance, request, event_type) + events_queue.set(queue) + + return None + + +# Install the wrapper unconditionally at import time. The wrapper +# itself is the no-op gate (it checks the contextvar on every call), +# so installing it always is the cheapest correct option and keeps +# tests able to exercise the buffer path without re-wiring receivers +# at runtime. The disconnect/connect pair runs once per worker process +# at module import, before any request is served, so it is not subject +# to the threading hazard that motivated the bypass module's design. +post_save.disconnect(_previous_handler) +m2m_changed.disconnect(_previous_handler) +post_save.connect(_buffered_handler) +m2m_changed.connect(_buffered_handler) + + +def _enrich_m2m(objectchanges): + """ + Re-add m2m relations to buffered ObjectChange rows in bulk. + + ``_fast_serialize_object`` omits m2m fields to avoid a per-save + SELECT per relation. This restores them: rows are grouped by content + type, and for each m2m relation on the model a single through-table + query resolves the relation for every object at once. The resolved + primary keys are written back into each row's ``postchange_data`` as + a sorted list, matching the membership Django's serializer would + have recorded. + + Must run after the apply transaction commits so the through-table + reads observe the final relation state. + """ + by_ct: dict[int, list] = defaultdict(list) + for oc in objectchanges: + if oc.postchange_data is not None and oc.changed_object_id is not None: + by_ct[oc.changed_object_type_id].append(oc) + + for ct_id, rows in by_ct.items(): + model = ContentType.objects.get_for_id(ct_id).model_class() + if model is None: + continue + + m2m_fields = [f for f in model._meta.local_many_to_many if f.serialize] + if not m2m_fields: + continue + + obj_ids = [oc.changed_object_id for oc in rows] + for field in m2m_fields: + through = field.remote_field.through + src_col = field.m2m_column_name() + tgt_col = field.m2m_reverse_name() + + relation: dict[int, list] = defaultdict(list) + pairs = through.objects.filter( + **{f"{src_col}__in": obj_ids} + ).values_list(src_col, tgt_col) + for src_id, tgt_id in pairs: + relation[src_id].append(tgt_id) + + for oc in rows: + oc.postchange_data[field.name] = sorted(relation.get(oc.changed_object_id, [])) + + +def _enrich_tags(objectchanges): + """ + Fill the tag placeholder left by ``_fast_serialize_object`` in bulk. + + ``_fast_serialize_object`` records ``tags: []`` for taggable models + instead of resolving tags per save (one query each). This resolves + them for all buffered objects at once: rows are grouped by content + type and one query over ``extras_taggeditem`` returns every object's + tag names, written back as a sorted list - matching what upstream + ``serialize_object`` records. + + Only rows that carry the ``tags`` placeholder (taggable models) are + touched, so non-taggable rows keep no ``tags`` key, exactly as + upstream. Must run after the apply transaction commits so the reads + observe the final tag assignments. + """ + by_ct: dict[int, list] = defaultdict(list) + for oc in objectchanges: + if ( + oc.postchange_data is not None + and "tags" in oc.postchange_data + and oc.changed_object_id is not None + ): + by_ct[oc.changed_object_type_id].append(oc) + + for ct_id, rows in by_ct.items(): + obj_ids = [oc.changed_object_id for oc in rows] + tags_by_obj: dict[int, list] = defaultdict(list) + pairs = TaggedItem.objects.filter( + content_type_id=ct_id, object_id__in=obj_ids + ).values_list("object_id", "tag__name") + for obj_id, tag_name in pairs: + tags_by_obj[obj_id].append(tag_name) + + for oc in rows: + oc.postchange_data["tags"] = sorted(tags_by_obj.get(oc.changed_object_id, [])) + + +def _flush_objectchanges(objectchanges): + """ + Persist buffered ObjectChange rows as one bulk_create and re-emit post_save. + + bulk_create does not fire ``post_save``; we re-emit it for each row + so receivers connected to ``post_save(sender=ObjectChange)`` fire + exactly once. The full kwargs set Django's own ``_save_table`` would + pass is supplied because some NetBox receivers (e.g. + ``update_denormalized_fields``) declare ``raw`` as required. + """ + if not objectchanges: + return + + _enrich_m2m(objectchanges) + _enrich_tags(objectchanges) + created = ObjectChange.objects.bulk_create(objectchanges) + for obj in created: + post_save.send( + sender=ObjectChange, + instance=obj, + created=True, + update_fields=None, + raw=False, + using=obj._state.db, + ) + + +@contextmanager +def buffered_change_logging(): + """ + Collect ObjectChange writes during an apply and flush them in bulk. + + No-op when ``apply_buffer_change_logging`` is False (the default), + which means the buffered handler still runs but delegates straight + through to upstream NetBox without touching the buffer. + + On a successful apply the buffered ObjectChange instances are handed + off for flushing. If a request-level batch is active (the view + wrapped its entity loop in ``request_change_logging_batch``) the + instances are appended to the batch and the outer context manager + flushes everything as one bulk_create at request end. Otherwise this + flushes its own buffer directly. + + Both paths register their work via ``transaction.on_commit`` so a + rolled-back apply drops the buffered rows without persisting them and + the bulk m2m enrichment observes committed relation state. + """ + if not get_plugin_config("netbox_diode_plugin", "apply_buffer_change_logging"): + yield + return + + token = _apply_change_buffer.set({}) + try: + yield + buffer = _apply_change_buffer.get() + if not buffer: + return + + instances = list(buffer.values()) + batch = _request_batch.get() + if batch is not None: + transaction.on_commit(lambda: batch.extend(instances)) + else: + transaction.on_commit(lambda: _flush_objectchanges(instances)) + finally: + _apply_change_buffer.reset(token) + + +@contextmanager +def request_change_logging_batch(): + """ + Collect every per-entity buffer in a request and flush them as one bulk_create. + + Endpoints that process many entities per HTTP request + (``/bulk-plan-apply/``, ``/bulk-apply/``) wrap their entity loop in + this context. Each per-entity ``buffered_change_logging`` appends its + ObjectChange instances to a shared list; on exit the whole list is + flushed once. Flushing at request scope is also what lets + ``_enrich_m2m`` resolve every object's relations in one query per + relation instead of one per entity. + + Per-entity rollback semantics are preserved: each entity's append is + registered via ``transaction.on_commit`` on its own atomic block, so + a failed entity contributes nothing. The flush itself is registered + via ``on_commit`` in ``finally`` so a partial batch from the entities + that did commit is still written even if a later entity raised. + + No-op when ``apply_buffer_change_logging`` is False (matches the + per-entity context manager's gate, so the two stay in lockstep). + """ + if not get_plugin_config("netbox_diode_plugin", "apply_buffer_change_logging"): + yield + return + + batch: list = [] + token = _request_batch.set(batch) + try: + yield + finally: + # Register the flush via on_commit rather than calling it inline + # so it queues after the per-entity appends. Django fires queued + # on_commit callbacks FIFO, so registering ours last guarantees a + # fully-populated batch. When the request has no surrounding + # atomic block, on_commit fires immediately - by which point the + # per-entity atomics have already committed and appended. + transaction.on_commit(lambda: _flush_objectchanges(batch)) + _request_batch.reset(token) diff --git a/netbox_diode_plugin/api/matcher.py b/netbox_diode_plugin/api/matcher.py index 73924b0..941287e 100644 --- a/netbox_diode_plugin/api/matcher.py +++ b/netbox_diode_plugin/api/matcher.py @@ -91,6 +91,53 @@ def invalidate_find_obj_entry(object_type: str, object_id: int): # These should represent the likely intent of a user when # matching existing objects. # +# Object types whose logical match criteria are NOT backed by a DB +# unique constraint. For CREATE changes on these types the applier +# must call find_existing_object BEFORE serializer.save(); otherwise +# concurrent planners can each emit CREATE for the same logical row +# and both inserts succeed, producing duplicates. The standard +# IntegrityError fallback in _create_or_find_instance cannot catch +# this because save() does not fail. +# +# The specific gaps (see _LOGICAL_MATCHERS below for the criteria): +# - dcim.macaddress: NetBox has no unique constraint on +# (mac_address, assigned_object_type, assigned_object_id). +# - dcim.modulebay: matched by (name, device); NetBox's parent-aware +# constraint does not catch unscoped duplicates the matcher dedupes. +# - ipam.vlan: NetBox's (group, vid) constraint does not enforce +# uniqueness when group is NULL. +# - ipam.vlangroup: NetBox does not enforce uniqueness of name when +# scope_type is NULL. +# - ipam.vrf: NetBox enforces uniqueness on rd, not name; multiple +# VRFs with rd=NULL and the same name are otherwise allowed. +# - virtualization.cluster: NetBox does not enforce uniqueness of +# name across scopes; matched by (name, scope_type, scope_id) or +# by name alone when unscoped and unparented. +# - virtualization.virtualmachine: NetBox does not enforce uniqueness +# of name when cluster is NULL. +# - wireless.wirelesslan: NetBox does not enforce ssid uniqueness. +# +# This closes the common race (concurrent plan, sequential apply). +# It does not close TOCTOU under truly concurrent apply across +# replicas - that would require a DB unique constraint or a +# coordinating lock. +_REQUIRES_PRE_SAVE_MATCH = frozenset({ + "dcim.macaddress", + "dcim.modulebay", + "ipam.vlan", + "ipam.vlangroup", + "ipam.vrf", + "virtualization.cluster", + "virtualization.virtualmachine", + "wireless.wirelesslan", +}) + + +def requires_pre_save_match(object_type: str) -> bool: + """Whether the applier must look up an existing row before CREATE.""" + return object_type in _REQUIRES_PRE_SAVE_MATCH + + _LOGICAL_MATCHERS = { "dcim.macaddress": lambda: [ ObjectMatchCriteria( diff --git a/netbox_diode_plugin/api/views.py b/netbox_diode_plugin/api/views.py index 008b3dd..b6f20c2 100644 --- a/netbox_diode_plugin/api/views.py +++ b/netbox_diode_plugin/api/views.py @@ -18,6 +18,7 @@ from . import customfield_cache # noqa: F401 - imported for side-effect install at module load from .applier import apply_changeset from .authentication import DiodeOAuth2Authentication +from .change_log_buffer import buffered_change_logging, request_change_logging_batch from .change_log_bypass import bypass_change_logging from .common import ( ChangeSet, @@ -129,12 +130,21 @@ def _apply_one_changeset(change_set: ChangeSet, request) -> ChangeSetResult: NetBox has no built-in periodic reindex; deployments enabling this bypass must schedule ``manage.py reindex`` (or a system_job) to keep the UI search box current. + + ``buffered_change_logging`` is layered on the same signal chain + but takes a different trade-off: it keeps the audit trail and the + receivers connected to ``post_save(sender=ObjectChange)`` that + consume it, and just batches the writes. Gated by + ``apply_buffer_change_logging`` (default off). Has no effect when + ``apply_bypass_change_logging`` is also on - the bypass returns + early and the buffer never receives any events. """ try: with ( transaction.atomic(), bypass_counter_updates(), bypass_change_logging(), + buffered_change_logging(), bypass_search_indexing(), ): return apply_changeset(change_set, request) @@ -489,24 +499,28 @@ def _post(self, request, *args, **kwargs): if len(change_sets) == 0: raise ValidationError({"change_sets": ["change_sets must not be empty"]}) - results = [] - for entry in change_sets: - if not isinstance(entry, dict): - results.append( - ChangeSetResult( - errors={"request": {"change_set": ["change_set must be an object"]}} + # Wrap the batch in the request-scope change-log batch so all + # per-changeset buffered ObjectChange rows are flushed as a + # single bulk_create, not one per changeset. + with request_change_logging_batch(): + results = [] + for entry in change_sets: + if not isinstance(entry, dict): + results.append( + ChangeSetResult( + errors={"request": {"change_set": ["change_set must be an object"]}} + ).to_dict() + ) + continue + try: + change_set = ChangeSet.from_dict(entry) + result = _apply_one_changeset(change_set, request).to_dict() + except Exception as e: + logger.error(f"Error parsing batch entry: {e}") + result = ChangeSetResult( + errors={"request": {"change_set": [f"invalid change_set: {e}"]}} ).to_dict() - ) - continue - try: - change_set = ChangeSet.from_dict(entry) - result = _apply_one_changeset(change_set, request).to_dict() - except Exception as e: - logger.error(f"Error parsing batch entry: {e}") - result = ChangeSetResult( - errors={"request": {"change_set": [f"invalid change_set: {e}"]}} - ).to_dict() - results.append(result) + results.append(result) http_status = ( status.HTTP_207_MULTI_STATUS @@ -578,12 +592,18 @@ def _post(self, request, *args, **kwargs): obj_token = enter_request_obj_cache() prechange_token = enter_prechange_cache() try: - results = [] - for entry in entities: - entity_id = entry.get("id") - result = self._process_entity(entry, branch_schema_id, request) - result["id"] = entity_id - results.append(result) + # Wrap the entity loop so that all per-entity buffered + # ObjectChange rows land in a single batch and are flushed + # as ONE bulk_create at end of request. Without this wrapper + # each entity flushes its own buffer (still correct, but + # resolves m2m relations per entity instead of in bulk). + with request_change_logging_batch(): + results = [] + for entry in entities: + entity_id = entry.get("id") + result = self._process_entity(entry, branch_schema_id, request) + result["id"] = entity_id + results.append(result) finally: exit_prechange_cache(prechange_token) exit_request_obj_cache(obj_token) diff --git a/netbox_diode_plugin/tests/v4.4.x/tests/test_api_bulk_plan_apply.py b/netbox_diode_plugin/tests/v4.4.x/tests/test_api_bulk_plan_apply.py index 0e9bd5c..e296e0f 100644 --- a/netbox_diode_plugin/tests/v4.4.x/tests/test_api_bulk_plan_apply.py +++ b/netbox_diode_plugin/tests/v4.4.x/tests/test_api_bulk_plan_apply.py @@ -5,8 +5,9 @@ import logging from types import SimpleNamespace from unittest import mock +from uuid import uuid4 -from dcim.models import Site +from dcim.models import MACAddress, Site from rest_framework import status from utilities.testing import APITestCase @@ -376,6 +377,106 @@ def test_unauthenticated_request_returns_403(self): response = self.client.post(self.url, data={"entities": []}, format="json") self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + # --- Cross-request plan race must not produce duplicate MACAddress --- + + def test_concurrent_plans_dedupe_macaddress_via_pre_save_match(self): + """ + Two requests planning the same MAC must apply to a single row. + + Two reconciler workers planning equivalent change_sets for the same + interface + MAC each see no existing MAC row and each plan a CREATE. + Two sequential ``/bulk-plan/`` calls model this exactly: each request + has its own request-scoped obj_cache, so plan B cannot see plan A's + pending CREATE. + + NetBox has no DB-level unique constraint on + (mac_address, assigned_object_type, assigned_object_id), so the + applier dedupes by routing dcim.macaddress through the find-first + CREATE path (matcher.requires_pre_save_match). Apply B's CREATE + therefore matches the row apply A just committed instead of + inserting a second one. + """ + suffix = uuid4().hex[:8] + mac = "00:00:00:00:00:42" + + plan_payload = { + "entities": [ + { + "id": f"race-{suffix}", + "object_type": "dcim.interface", + "entity": { + "interface": { + "name": f"eth0-{suffix}", + "type": "1000base-t", + "device": { + "name": f"dev-{suffix}", + "role": {"name": f"role-{suffix}"}, + "site": {"name": f"site-{suffix}"}, + "device_type": { + "manufacturer": {"name": f"mfr-{suffix}"}, + "model": f"dt-{suffix}", + }, + }, + "primary_mac_address": {"mac_address": mac}, + }, + }, + } + ] + } + + plan_url = "/netbox/api/plugins/diode/bulk-plan/" + apply_url = "/netbox/api/plugins/diode/bulk-apply/" + + plan_a = self.client.post( + plan_url, data=plan_payload, format="json", **self.authorization_header + ) + plan_b = self.client.post( + plan_url, data=plan_payload, format="json", **self.authorization_header + ) + self.assertEqual(plan_a.status_code, status.HTTP_200_OK, plan_a.json()) + self.assertEqual(plan_b.status_code, status.HTTP_200_OK, plan_b.json()) + + result_a = plan_a.json()["results"][0] + result_b = plan_b.json()["results"][0] + self.assertIsNone(result_a.get("errors"), result_a) + self.assertIsNone(result_b.get("errors"), result_b) + cs_a = result_a.get("change_set") + cs_b = result_b.get("change_set") + self.assertIsNotNone(cs_a, result_a) + self.assertIsNotNone(cs_b, result_b) + + def mac_creates(change_set): + return [ + c for c in change_set["changes"] + if c["object_type"] == "dcim.macaddress" and c["change_type"] == "create" + ] + + self.assertEqual(len(mac_creates(cs_a)), 1, cs_a) + self.assertEqual(len(mac_creates(cs_b)), 1, cs_b) + + apply_a = self.client.post( + apply_url, + data={"change_sets": [cs_a]}, + format="json", + **self.authorization_header, + ) + apply_b = self.client.post( + apply_url, + data={"change_sets": [cs_b]}, + format="json", + **self.authorization_header, + ) + self.assertEqual(apply_a.status_code, status.HTTP_200_OK, apply_a.json()) + self.assertEqual(apply_b.status_code, status.HTTP_200_OK, apply_b.json()) + + macs = MACAddress.objects.filter(mac_address=mac) + self.assertEqual( + macs.count(), + 1, + f"expected exactly one MAC row after dedup, got {macs.count()}: " + f"{list(macs.values('pk', 'mac_address', 'assigned_object_id'))}", + ) + def test_insufficient_scope_returns_403(self): """Token with only read scope cannot call bulk-plan-apply (requires write).""" read_only_user = SimpleNamespace( diff --git a/netbox_diode_plugin/tests/v4.4.x/tests/test_change_log_buffer.py b/netbox_diode_plugin/tests/v4.4.x/tests/test_change_log_buffer.py new file mode 100644 index 0000000..a8cc45c --- /dev/null +++ b/netbox_diode_plugin/tests/v4.4.x/tests/test_change_log_buffer.py @@ -0,0 +1,460 @@ +#!/usr/bin/env python +# Copyright 2026 NetBox Labs, Inc. +"""Diode NetBox Plugin - buffered_change_logging tests.""" + +import logging +from types import SimpleNamespace +from unittest import mock +from uuid import uuid4 + +from core.models import ObjectChange +from dcim.models import Site +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from extras.models import Tag +from ipam.models import ASN, RIR +from netbox.config import get_config +from rest_framework import status +from utilities.testing import APITestCase + +from netbox_diode_plugin.api import change_log_buffer, views +from netbox_diode_plugin.api.authentication import DiodeOAuth2Authentication +from netbox_diode_plugin.plugin_config import get_diode_user + +logger = logging.getLogger(__name__) + + +class BufferedChangeLoggingApplyTestCase(APITestCase): + """End-to-end behaviour of `buffered_change_logging` via `/bulk-plan-apply/`.""" + + def setUp(self): + """Auth + clean ObjectChange table for predictable counts.""" + self.url = "/netbox/api/plugins/diode/bulk-plan-apply/" + self.authorization_header = {"HTTP_AUTHORIZATION": "Bearer mocked_oauth_token"} + self.diode_user = SimpleNamespace( + user=get_diode_user(), + token_scopes=["netbox:read", "netbox:write"], + token_data={"scope": "netbox:read netbox:write"}, + ) + self.introspect_patcher = mock.patch.object( + DiodeOAuth2Authentication, + "_introspect_token", + return_value=self.diode_user, + ) + self.introspect_patcher.start() + + ObjectChange.objects.all().delete() + + def tearDown(self): + """Stop the auth patcher.""" + self.introspect_patcher.stop() + super().tearDown() + + def _make_payload(self, suffix): + """Build a single-entity bulk-plan-apply payload that creates a fresh Site.""" + return { + "entities": [ + { + "id": f"entity-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Site {suffix}", "slug": f"site-{suffix}"}}, + } + ] + } + + def _site_change_count(self): + """Return the number of ObjectChange rows whose changed_object is a Site.""" + return ObjectChange.objects.filter( + changed_object_type__app_label="dcim", + changed_object_type__model="site", + ).count() + + # --- Setting OFF: buffer is a pass-through, upstream writes synchronously --- + + def test_setting_false_writes_objectchange_synchronously(self): + """With the setting off (default), the apply writes ObjectChange synchronously via upstream.""" + response = self.client.post( + self.url, + data=self._make_payload(uuid4().hex[:8]), + format="json", + **self.authorization_header, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Upstream synchronous path -> row already in DB at request return. + self.assertEqual(self._site_change_count(), 1) + + # --- Setting ON: buffer collects + flushes one bulk_create at commit --- + + def test_setting_true_flushes_objectchange_on_commit(self): + """With the setting on, the buffered ObjectChange is written by the on_commit flush.""" + suffix = uuid4().hex[:8] + # Django's TestCase wraps each test in a transaction that is + # rolled back at end-of-test, which means `transaction.on_commit` + # callbacks normally never fire. `captureOnCommitCallbacks` runs + # them explicitly on context exit so we can assert the flush. + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + + # Apply transaction committed -> on_commit fired -> exactly one row. + self.assertEqual(self._site_change_count(), 1) + oc = ObjectChange.objects.get( + changed_object_type__app_label="dcim", changed_object_type__model="site" + ) + self.assertEqual(oc.action, "create") + self.assertEqual(oc.object_repr, f"Site {suffix}") + + # --- Rollback: no flush on apply failure --- + + def test_rollback_does_not_flush(self): + """If apply_changeset raises, transaction.on_commit never fires and nothing is written.""" + suffix = uuid4().hex[:8] + + def failing_apply(change_set, request): + Site.objects.create(name=f"Doomed {suffix}", slug=f"doomed-{suffix}") + raise RuntimeError("forced rollback") + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object(views, "apply_changeset", side_effect=failing_apply), \ + mock.patch.object(change_log_buffer, "_flush_objectchanges") as mock_flush, \ + self.captureOnCommitCallbacks(execute=True): + self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + + # Outer atomic rolled back the Site; the append on_commit was discarded. + self.assertFalse(Site.objects.filter(slug=f"doomed-{suffix}").exists()) + # Flush still runs at request end but the batch is empty. + if mock_flush.called: + self.assertEqual(list(mock_flush.call_args.args[0]), []) + + # --- Bypass wins when both flags are enabled --- + + def test_bypass_takes_precedence_over_buffer_when_both_active(self): + """When apply_bypass_change_logging is active, the buffer never collects and nothing is written.""" + suffix = uuid4().hex[:8] + + bypass_token = change_log_buffer._bypass_active.set(True) + try: + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + finally: + change_log_buffer._bypass_active.reset(bypass_token) + + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Site was created (apply itself wasn't bypassed), but no ObjectChange. + self.assertTrue(Site.objects.filter(slug=f"site-{suffix}").exists()) + self.assertEqual(self._site_change_count(), 0) + + # --- Request-level batching: many entities -> one consolidated flush --- + + def test_multi_entity_request_flushes_one_batch(self): + """3 entities in one request -> ONE flush carrying all 3 entities' rows.""" + suffix = uuid4().hex[:8] + payload = { + "entities": [ + { + "id": f"entity-{i}-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Site {i} {suffix}", "slug": f"site-{i}-{suffix}"}}, + } + for i in range(3) + ] + } + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object( + change_log_buffer, "_flush_objectchanges", + side_effect=change_log_buffer._flush_objectchanges, + ) as spy_flush, \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=payload, format="json", **self.authorization_header + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Single flush for the whole request, not one per entity. + spy_flush.assert_called_once() + flushed = list(spy_flush.call_args.args[0]) + self.assertEqual(len(flushed), 3) + reprs = {oc.object_repr for oc in flushed} + self.assertEqual(reprs, {f"Site 0 {suffix}", f"Site 1 {suffix}", f"Site 2 {suffix}"}) + self.assertEqual(self._site_change_count(), 3) + + def test_failed_entity_excluded_from_batch(self): + """When one entity fails, its rows are dropped from the consolidated flush.""" + suffix = uuid4().hex[:8] + good_slug = f"good-{suffix}" + bad_slug = f"bad-{suffix}" + payload = { + "entities": [ + { + "id": f"good-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Good {suffix}", "slug": good_slug}}, + }, + { + "id": f"bad-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Bad {suffix}", "slug": bad_slug}}, + }, + ] + } + + original_apply = views.apply_changeset + call_count = {"n": 0} + + def selective_failing_apply(change_set, request): + call_count["n"] += 1 + if call_count["n"] == 1: + return original_apply(change_set, request) + Site.objects.create(name=f"Bad inner {suffix}", slug=bad_slug) + raise RuntimeError("forced rollback for second entity") + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object(views, "apply_changeset", side_effect=selective_failing_apply), \ + mock.patch.object( + change_log_buffer, "_flush_objectchanges", + side_effect=change_log_buffer._flush_objectchanges, + ) as spy_flush, \ + self.captureOnCommitCallbacks(execute=True): + self.client.post( + self.url, data=payload, format="json", **self.authorization_header + ) + + # Good entity made it through; bad entity rolled back. + self.assertTrue(Site.objects.filter(slug=good_slug).exists()) + self.assertFalse(Site.objects.filter(slug=bad_slug).exists()) + # Flush happens but only contains the good row. + spy_flush.assert_called_once() + flushed = list(spy_flush.call_args.args[0]) + self.assertEqual(len(flushed), 1) + self.assertEqual(flushed[0].object_repr, f"Good {suffix}") + + +class FastSerializeTestCase(TestCase): + """`_fast_serialize_object` parity and the buffer-active gate.""" + + def setUp(self): + """Create a Site with an m2m relation (asns) and a tag.""" + ObjectChange.objects.all().delete() + self.site = Site.objects.create(name="Parity site", slug=f"parity-{uuid4().hex[:8]}") + self.site.tags.add( + Tag.objects.create(name="alpha", slug="alpha"), + Tag.objects.create(name="beta", slug="beta"), + ) + rir = RIR.objects.create(name="Test RIR", slug=f"rir-{uuid4().hex[:8]}") + self.asn = ASN.objects.create(asn=65001, rir=rir) + self.site.asns.add(self.asn) + + def test_fast_serialize_omits_m2m_and_placeholders_tags(self): + """Fast output equals upstream minus m2m, with tags left as an empty placeholder.""" + vanilla = change_log_buffer._original_serialize_object(self.site, exclude=[]) + fast = change_log_buffer._fast_serialize_object(self.site, exclude=[]) + + # `asns` is the only serialisable m2m on Site; the fast path drops it. + m2m_names = {f.name for f in Site._meta.local_many_to_many if f.serialize} + self.assertIn("asns", m2m_names) + self.assertNotIn("asns", fast) + # Tags are a placeholder, resolved in bulk at flush, not per-save. + self.assertEqual(fast["tags"], []) + + # Every other field matches upstream exactly. + expected = {k: v for k, v in vanilla.items() if k not in m2m_names and k != "tags"} + actual = {k: v for k, v in fast.items() if k != "tags"} + self.assertEqual(actual, expected) + + def test_full_parity_after_enrichment(self): + """Fast serialize + m2m + tag enrichment reproduces the upstream serializer output.""" + vanilla = change_log_buffer._original_serialize_object(self.site, exclude=[]) + row = ObjectChange( + changed_object_type_id=ContentType.objects.get_for_model(Site).id, + changed_object_id=self.site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(self.site, exclude=[]), + ) + change_log_buffer._enrich_m2m([row]) + change_log_buffer._enrich_tags([row]) + + # m2m ordering: enrichment sorts, upstream uses queryset order; + # compare membership for the relation, then the rest exactly. + self.assertEqual(set(row.postchange_data.pop("asns")), set(vanilla.pop("asns"))) + self.assertEqual(row.postchange_data, vanilla) + + def test_serialize_object_gate_is_inactive_without_buffer(self): + """With no buffer active, serialize_object delegates to the upstream implementation.""" + # Identical output to the captured original means the gate did not + # engage the fast path. + self.assertEqual( + self.site.serialize_object(), + change_log_buffer._original_serialize_object(self.site), + ) + + def test_serialize_object_gate_engages_fast_path_with_buffer(self): + """With a buffer active, serialize_object routes through the fast path (no m2m).""" + token = change_log_buffer._apply_change_buffer.set({}) + try: + gated = self.site.serialize_object() + finally: + change_log_buffer._apply_change_buffer.reset(token) + self.assertNotIn("asns", gated) + + +class EnrichM2MTestCase(TestCase): + """`_enrich_m2m` re-adds m2m relations to buffered rows in bulk.""" + + def setUp(self): + """Two Sites, each with distinct ASNs, to exercise per-object grouping.""" + ObjectChange.objects.all().delete() + rir = RIR.objects.create(name="Enrich RIR", slug=f"erir-{uuid4().hex[:8]}") + self.asn1 = ASN.objects.create(asn=65010, rir=rir) + self.asn2 = ASN.objects.create(asn=65011, rir=rir) + self.site_a = Site.objects.create(name="Enrich A", slug=f"enrich-a-{uuid4().hex[:8]}") + self.site_b = Site.objects.create(name="Enrich B", slug=f"enrich-b-{uuid4().hex[:8]}") + self.site_a.asns.add(self.asn1, self.asn2) + self.site_b.asns.add(self.asn1) + self.ct_id = ContentType.objects.get_for_model(Site).id + + def _row(self, site): + return ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(site, exclude=[]), + ) + + def test_enrich_populates_m2m_per_object(self): + """Each row's postchange_data gets its own object's relation, sorted.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + change_log_buffer._enrich_m2m(rows) + + self.assertEqual(rows[0].postchange_data["asns"], sorted([self.asn1.pk, self.asn2.pk])) + self.assertEqual(rows[1].postchange_data["asns"], [self.asn1.pk]) + + def test_enriched_membership_matches_upstream(self): + """After enrichment, m2m membership matches what the upstream serializer records.""" + vanilla = change_log_buffer._original_serialize_object(self.site_a, exclude=[]) + row = self._row(self.site_a) + change_log_buffer._enrich_m2m([row]) + self.assertEqual(set(row.postchange_data["asns"]), set(vanilla["asns"])) + + def test_enrich_is_bulk_one_query_per_relation(self): + """Resolving N objects' relations costs one query per relation, not one per object.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + # Site has a single serialisable m2m (asns) -> exactly one query. + with self.assertNumQueries(1): + change_log_buffer._enrich_m2m(rows) + + +class EnrichTagsTestCase(TestCase): + """`_enrich_tags` fills the tag placeholder left by the fast serializer in bulk.""" + + def setUp(self): + """Two Sites with overlapping tags to exercise per-object grouping.""" + ObjectChange.objects.all().delete() + self.red = Tag.objects.create(name="red", slug="red") + self.blue = Tag.objects.create(name="blue", slug="blue") + self.site_a = Site.objects.create(name="Tag A", slug=f"tag-a-{uuid4().hex[:8]}") + self.site_b = Site.objects.create(name="Tag B", slug=f"tag-b-{uuid4().hex[:8]}") + self.site_a.tags.add(self.red, self.blue) + self.site_b.tags.add(self.red) + self.ct_id = ContentType.objects.get_for_model(Site).id + + def _row(self, site): + return ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(site, exclude=[]), + ) + + def test_enrich_fills_tags_per_object_sorted(self): + """Each row gets its own object's tag names, sorted.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + change_log_buffer._enrich_tags(rows) + self.assertEqual(rows[0].postchange_data["tags"], ["blue", "red"]) + self.assertEqual(rows[1].postchange_data["tags"], ["red"]) + + def test_enrich_tags_one_query_per_content_type(self): + """N taggable objects of one model cost a single tag query, not one per object.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + with self.assertNumQueries(1): + change_log_buffer._enrich_tags(rows) + + def test_enrich_tags_empty_for_untagged_object(self): + """A taggable object with no tags keeps the empty placeholder.""" + site_c = Site.objects.create(name="Tag C", slug=f"tag-c-{uuid4().hex[:8]}") + row = self._row(site_c) + change_log_buffer._enrich_tags([row]) + self.assertEqual(row.postchange_data["tags"], []) + + def test_enrich_tags_skips_rows_without_placeholder(self): + """Rows whose postchange_data has no `tags` key (non-taggable shape) are not given one.""" + row = ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=self.site_a.pk, + action="create", + postchange_data={"name": "no-tags-key"}, + ) + change_log_buffer._enrich_tags([row]) + self.assertNotIn("tags", row.postchange_data) + + +class SnapshotForApplyTestCase(TestCase): + """`snapshot_for_apply` captures prechange consistent with the buffered postchange.""" + + def setUp(self): + """A Site with an m2m relation (asns) and a tag.""" + ObjectChange.objects.all().delete() + self.site = Site.objects.create(name="Snap site", slug=f"snap-{uuid4().hex[:8]}") + self.site.tags.add(Tag.objects.create(name="green", slug="green")) + rir = RIR.objects.create(name="Snap RIR", slug=f"srir-{uuid4().hex[:8]}") + self.asn = ASN.objects.create(asn=65100, rir=rir) + self.site.asns.add(self.asn) + self.ct_id = ContentType.objects.get_for_model(Site).id + self._exclude = ["last_updated"] if get_config().CHANGELOG_SKIP_EMPTY_CHANGES else [] + + def _snapshot_with_buffer(self): + token = change_log_buffer._apply_change_buffer.set({}) + try: + change_log_buffer.snapshot_for_apply(self.site) + finally: + change_log_buffer._apply_change_buffer.reset(token) + return self.site._prechange_snapshot + + def test_buffer_active_captures_sorted_m2m_and_tags(self): + """With the buffer active, prechange records m2m + tags resolved now, sorted.""" + snap = self._snapshot_with_buffer() + self.assertEqual(snap["asns"], [self.asn.pk]) + self.assertEqual(snap["tags"], ["green"]) + + def test_prechange_matches_postchange_for_unchanged_object(self): + """Prechange and postchange are identical for an unmodified object (no spurious diff).""" + prechange = self._snapshot_with_buffer() + + # Build postchange exactly as the buffered flush would. + row = ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=self.site.pk, + action="update", + postchange_data=change_log_buffer._fast_serialize_object(self.site, exclude=self._exclude), + ) + change_log_buffer._enrich_m2m([row]) + change_log_buffer._enrich_tags([row]) + + self.assertEqual(prechange, row.postchange_data) + + def test_buffer_inactive_delegates_to_netbox_snapshot(self): + """With no buffer, prechange comes from NetBox's own snapshot (full serializer).""" + if hasattr(self.site, "_prechange_snapshot"): + del self.site._prechange_snapshot + change_log_buffer.snapshot_for_apply(self.site) + self.assertTrue(hasattr(self.site, "_prechange_snapshot")) + # The full serializer includes the m2m relation directly. + self.assertIn("asns", self.site._prechange_snapshot) diff --git a/netbox_diode_plugin/tests/v4.5.x/tests/test_api_bulk_plan_apply.py b/netbox_diode_plugin/tests/v4.5.x/tests/test_api_bulk_plan_apply.py index 0e9bd5c..e296e0f 100644 --- a/netbox_diode_plugin/tests/v4.5.x/tests/test_api_bulk_plan_apply.py +++ b/netbox_diode_plugin/tests/v4.5.x/tests/test_api_bulk_plan_apply.py @@ -5,8 +5,9 @@ import logging from types import SimpleNamespace from unittest import mock +from uuid import uuid4 -from dcim.models import Site +from dcim.models import MACAddress, Site from rest_framework import status from utilities.testing import APITestCase @@ -376,6 +377,106 @@ def test_unauthenticated_request_returns_403(self): response = self.client.post(self.url, data={"entities": []}, format="json") self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + # --- Cross-request plan race must not produce duplicate MACAddress --- + + def test_concurrent_plans_dedupe_macaddress_via_pre_save_match(self): + """ + Two requests planning the same MAC must apply to a single row. + + Two reconciler workers planning equivalent change_sets for the same + interface + MAC each see no existing MAC row and each plan a CREATE. + Two sequential ``/bulk-plan/`` calls model this exactly: each request + has its own request-scoped obj_cache, so plan B cannot see plan A's + pending CREATE. + + NetBox has no DB-level unique constraint on + (mac_address, assigned_object_type, assigned_object_id), so the + applier dedupes by routing dcim.macaddress through the find-first + CREATE path (matcher.requires_pre_save_match). Apply B's CREATE + therefore matches the row apply A just committed instead of + inserting a second one. + """ + suffix = uuid4().hex[:8] + mac = "00:00:00:00:00:42" + + plan_payload = { + "entities": [ + { + "id": f"race-{suffix}", + "object_type": "dcim.interface", + "entity": { + "interface": { + "name": f"eth0-{suffix}", + "type": "1000base-t", + "device": { + "name": f"dev-{suffix}", + "role": {"name": f"role-{suffix}"}, + "site": {"name": f"site-{suffix}"}, + "device_type": { + "manufacturer": {"name": f"mfr-{suffix}"}, + "model": f"dt-{suffix}", + }, + }, + "primary_mac_address": {"mac_address": mac}, + }, + }, + } + ] + } + + plan_url = "/netbox/api/plugins/diode/bulk-plan/" + apply_url = "/netbox/api/plugins/diode/bulk-apply/" + + plan_a = self.client.post( + plan_url, data=plan_payload, format="json", **self.authorization_header + ) + plan_b = self.client.post( + plan_url, data=plan_payload, format="json", **self.authorization_header + ) + self.assertEqual(plan_a.status_code, status.HTTP_200_OK, plan_a.json()) + self.assertEqual(plan_b.status_code, status.HTTP_200_OK, plan_b.json()) + + result_a = plan_a.json()["results"][0] + result_b = plan_b.json()["results"][0] + self.assertIsNone(result_a.get("errors"), result_a) + self.assertIsNone(result_b.get("errors"), result_b) + cs_a = result_a.get("change_set") + cs_b = result_b.get("change_set") + self.assertIsNotNone(cs_a, result_a) + self.assertIsNotNone(cs_b, result_b) + + def mac_creates(change_set): + return [ + c for c in change_set["changes"] + if c["object_type"] == "dcim.macaddress" and c["change_type"] == "create" + ] + + self.assertEqual(len(mac_creates(cs_a)), 1, cs_a) + self.assertEqual(len(mac_creates(cs_b)), 1, cs_b) + + apply_a = self.client.post( + apply_url, + data={"change_sets": [cs_a]}, + format="json", + **self.authorization_header, + ) + apply_b = self.client.post( + apply_url, + data={"change_sets": [cs_b]}, + format="json", + **self.authorization_header, + ) + self.assertEqual(apply_a.status_code, status.HTTP_200_OK, apply_a.json()) + self.assertEqual(apply_b.status_code, status.HTTP_200_OK, apply_b.json()) + + macs = MACAddress.objects.filter(mac_address=mac) + self.assertEqual( + macs.count(), + 1, + f"expected exactly one MAC row after dedup, got {macs.count()}: " + f"{list(macs.values('pk', 'mac_address', 'assigned_object_id'))}", + ) + def test_insufficient_scope_returns_403(self): """Token with only read scope cannot call bulk-plan-apply (requires write).""" read_only_user = SimpleNamespace( diff --git a/netbox_diode_plugin/tests/v4.5.x/tests/test_change_log_buffer.py b/netbox_diode_plugin/tests/v4.5.x/tests/test_change_log_buffer.py new file mode 100644 index 0000000..a8cc45c --- /dev/null +++ b/netbox_diode_plugin/tests/v4.5.x/tests/test_change_log_buffer.py @@ -0,0 +1,460 @@ +#!/usr/bin/env python +# Copyright 2026 NetBox Labs, Inc. +"""Diode NetBox Plugin - buffered_change_logging tests.""" + +import logging +from types import SimpleNamespace +from unittest import mock +from uuid import uuid4 + +from core.models import ObjectChange +from dcim.models import Site +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from extras.models import Tag +from ipam.models import ASN, RIR +from netbox.config import get_config +from rest_framework import status +from utilities.testing import APITestCase + +from netbox_diode_plugin.api import change_log_buffer, views +from netbox_diode_plugin.api.authentication import DiodeOAuth2Authentication +from netbox_diode_plugin.plugin_config import get_diode_user + +logger = logging.getLogger(__name__) + + +class BufferedChangeLoggingApplyTestCase(APITestCase): + """End-to-end behaviour of `buffered_change_logging` via `/bulk-plan-apply/`.""" + + def setUp(self): + """Auth + clean ObjectChange table for predictable counts.""" + self.url = "/netbox/api/plugins/diode/bulk-plan-apply/" + self.authorization_header = {"HTTP_AUTHORIZATION": "Bearer mocked_oauth_token"} + self.diode_user = SimpleNamespace( + user=get_diode_user(), + token_scopes=["netbox:read", "netbox:write"], + token_data={"scope": "netbox:read netbox:write"}, + ) + self.introspect_patcher = mock.patch.object( + DiodeOAuth2Authentication, + "_introspect_token", + return_value=self.diode_user, + ) + self.introspect_patcher.start() + + ObjectChange.objects.all().delete() + + def tearDown(self): + """Stop the auth patcher.""" + self.introspect_patcher.stop() + super().tearDown() + + def _make_payload(self, suffix): + """Build a single-entity bulk-plan-apply payload that creates a fresh Site.""" + return { + "entities": [ + { + "id": f"entity-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Site {suffix}", "slug": f"site-{suffix}"}}, + } + ] + } + + def _site_change_count(self): + """Return the number of ObjectChange rows whose changed_object is a Site.""" + return ObjectChange.objects.filter( + changed_object_type__app_label="dcim", + changed_object_type__model="site", + ).count() + + # --- Setting OFF: buffer is a pass-through, upstream writes synchronously --- + + def test_setting_false_writes_objectchange_synchronously(self): + """With the setting off (default), the apply writes ObjectChange synchronously via upstream.""" + response = self.client.post( + self.url, + data=self._make_payload(uuid4().hex[:8]), + format="json", + **self.authorization_header, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Upstream synchronous path -> row already in DB at request return. + self.assertEqual(self._site_change_count(), 1) + + # --- Setting ON: buffer collects + flushes one bulk_create at commit --- + + def test_setting_true_flushes_objectchange_on_commit(self): + """With the setting on, the buffered ObjectChange is written by the on_commit flush.""" + suffix = uuid4().hex[:8] + # Django's TestCase wraps each test in a transaction that is + # rolled back at end-of-test, which means `transaction.on_commit` + # callbacks normally never fire. `captureOnCommitCallbacks` runs + # them explicitly on context exit so we can assert the flush. + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + + # Apply transaction committed -> on_commit fired -> exactly one row. + self.assertEqual(self._site_change_count(), 1) + oc = ObjectChange.objects.get( + changed_object_type__app_label="dcim", changed_object_type__model="site" + ) + self.assertEqual(oc.action, "create") + self.assertEqual(oc.object_repr, f"Site {suffix}") + + # --- Rollback: no flush on apply failure --- + + def test_rollback_does_not_flush(self): + """If apply_changeset raises, transaction.on_commit never fires and nothing is written.""" + suffix = uuid4().hex[:8] + + def failing_apply(change_set, request): + Site.objects.create(name=f"Doomed {suffix}", slug=f"doomed-{suffix}") + raise RuntimeError("forced rollback") + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object(views, "apply_changeset", side_effect=failing_apply), \ + mock.patch.object(change_log_buffer, "_flush_objectchanges") as mock_flush, \ + self.captureOnCommitCallbacks(execute=True): + self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + + # Outer atomic rolled back the Site; the append on_commit was discarded. + self.assertFalse(Site.objects.filter(slug=f"doomed-{suffix}").exists()) + # Flush still runs at request end but the batch is empty. + if mock_flush.called: + self.assertEqual(list(mock_flush.call_args.args[0]), []) + + # --- Bypass wins when both flags are enabled --- + + def test_bypass_takes_precedence_over_buffer_when_both_active(self): + """When apply_bypass_change_logging is active, the buffer never collects and nothing is written.""" + suffix = uuid4().hex[:8] + + bypass_token = change_log_buffer._bypass_active.set(True) + try: + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + finally: + change_log_buffer._bypass_active.reset(bypass_token) + + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Site was created (apply itself wasn't bypassed), but no ObjectChange. + self.assertTrue(Site.objects.filter(slug=f"site-{suffix}").exists()) + self.assertEqual(self._site_change_count(), 0) + + # --- Request-level batching: many entities -> one consolidated flush --- + + def test_multi_entity_request_flushes_one_batch(self): + """3 entities in one request -> ONE flush carrying all 3 entities' rows.""" + suffix = uuid4().hex[:8] + payload = { + "entities": [ + { + "id": f"entity-{i}-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Site {i} {suffix}", "slug": f"site-{i}-{suffix}"}}, + } + for i in range(3) + ] + } + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object( + change_log_buffer, "_flush_objectchanges", + side_effect=change_log_buffer._flush_objectchanges, + ) as spy_flush, \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=payload, format="json", **self.authorization_header + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Single flush for the whole request, not one per entity. + spy_flush.assert_called_once() + flushed = list(spy_flush.call_args.args[0]) + self.assertEqual(len(flushed), 3) + reprs = {oc.object_repr for oc in flushed} + self.assertEqual(reprs, {f"Site 0 {suffix}", f"Site 1 {suffix}", f"Site 2 {suffix}"}) + self.assertEqual(self._site_change_count(), 3) + + def test_failed_entity_excluded_from_batch(self): + """When one entity fails, its rows are dropped from the consolidated flush.""" + suffix = uuid4().hex[:8] + good_slug = f"good-{suffix}" + bad_slug = f"bad-{suffix}" + payload = { + "entities": [ + { + "id": f"good-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Good {suffix}", "slug": good_slug}}, + }, + { + "id": f"bad-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Bad {suffix}", "slug": bad_slug}}, + }, + ] + } + + original_apply = views.apply_changeset + call_count = {"n": 0} + + def selective_failing_apply(change_set, request): + call_count["n"] += 1 + if call_count["n"] == 1: + return original_apply(change_set, request) + Site.objects.create(name=f"Bad inner {suffix}", slug=bad_slug) + raise RuntimeError("forced rollback for second entity") + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object(views, "apply_changeset", side_effect=selective_failing_apply), \ + mock.patch.object( + change_log_buffer, "_flush_objectchanges", + side_effect=change_log_buffer._flush_objectchanges, + ) as spy_flush, \ + self.captureOnCommitCallbacks(execute=True): + self.client.post( + self.url, data=payload, format="json", **self.authorization_header + ) + + # Good entity made it through; bad entity rolled back. + self.assertTrue(Site.objects.filter(slug=good_slug).exists()) + self.assertFalse(Site.objects.filter(slug=bad_slug).exists()) + # Flush happens but only contains the good row. + spy_flush.assert_called_once() + flushed = list(spy_flush.call_args.args[0]) + self.assertEqual(len(flushed), 1) + self.assertEqual(flushed[0].object_repr, f"Good {suffix}") + + +class FastSerializeTestCase(TestCase): + """`_fast_serialize_object` parity and the buffer-active gate.""" + + def setUp(self): + """Create a Site with an m2m relation (asns) and a tag.""" + ObjectChange.objects.all().delete() + self.site = Site.objects.create(name="Parity site", slug=f"parity-{uuid4().hex[:8]}") + self.site.tags.add( + Tag.objects.create(name="alpha", slug="alpha"), + Tag.objects.create(name="beta", slug="beta"), + ) + rir = RIR.objects.create(name="Test RIR", slug=f"rir-{uuid4().hex[:8]}") + self.asn = ASN.objects.create(asn=65001, rir=rir) + self.site.asns.add(self.asn) + + def test_fast_serialize_omits_m2m_and_placeholders_tags(self): + """Fast output equals upstream minus m2m, with tags left as an empty placeholder.""" + vanilla = change_log_buffer._original_serialize_object(self.site, exclude=[]) + fast = change_log_buffer._fast_serialize_object(self.site, exclude=[]) + + # `asns` is the only serialisable m2m on Site; the fast path drops it. + m2m_names = {f.name for f in Site._meta.local_many_to_many if f.serialize} + self.assertIn("asns", m2m_names) + self.assertNotIn("asns", fast) + # Tags are a placeholder, resolved in bulk at flush, not per-save. + self.assertEqual(fast["tags"], []) + + # Every other field matches upstream exactly. + expected = {k: v for k, v in vanilla.items() if k not in m2m_names and k != "tags"} + actual = {k: v for k, v in fast.items() if k != "tags"} + self.assertEqual(actual, expected) + + def test_full_parity_after_enrichment(self): + """Fast serialize + m2m + tag enrichment reproduces the upstream serializer output.""" + vanilla = change_log_buffer._original_serialize_object(self.site, exclude=[]) + row = ObjectChange( + changed_object_type_id=ContentType.objects.get_for_model(Site).id, + changed_object_id=self.site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(self.site, exclude=[]), + ) + change_log_buffer._enrich_m2m([row]) + change_log_buffer._enrich_tags([row]) + + # m2m ordering: enrichment sorts, upstream uses queryset order; + # compare membership for the relation, then the rest exactly. + self.assertEqual(set(row.postchange_data.pop("asns")), set(vanilla.pop("asns"))) + self.assertEqual(row.postchange_data, vanilla) + + def test_serialize_object_gate_is_inactive_without_buffer(self): + """With no buffer active, serialize_object delegates to the upstream implementation.""" + # Identical output to the captured original means the gate did not + # engage the fast path. + self.assertEqual( + self.site.serialize_object(), + change_log_buffer._original_serialize_object(self.site), + ) + + def test_serialize_object_gate_engages_fast_path_with_buffer(self): + """With a buffer active, serialize_object routes through the fast path (no m2m).""" + token = change_log_buffer._apply_change_buffer.set({}) + try: + gated = self.site.serialize_object() + finally: + change_log_buffer._apply_change_buffer.reset(token) + self.assertNotIn("asns", gated) + + +class EnrichM2MTestCase(TestCase): + """`_enrich_m2m` re-adds m2m relations to buffered rows in bulk.""" + + def setUp(self): + """Two Sites, each with distinct ASNs, to exercise per-object grouping.""" + ObjectChange.objects.all().delete() + rir = RIR.objects.create(name="Enrich RIR", slug=f"erir-{uuid4().hex[:8]}") + self.asn1 = ASN.objects.create(asn=65010, rir=rir) + self.asn2 = ASN.objects.create(asn=65011, rir=rir) + self.site_a = Site.objects.create(name="Enrich A", slug=f"enrich-a-{uuid4().hex[:8]}") + self.site_b = Site.objects.create(name="Enrich B", slug=f"enrich-b-{uuid4().hex[:8]}") + self.site_a.asns.add(self.asn1, self.asn2) + self.site_b.asns.add(self.asn1) + self.ct_id = ContentType.objects.get_for_model(Site).id + + def _row(self, site): + return ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(site, exclude=[]), + ) + + def test_enrich_populates_m2m_per_object(self): + """Each row's postchange_data gets its own object's relation, sorted.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + change_log_buffer._enrich_m2m(rows) + + self.assertEqual(rows[0].postchange_data["asns"], sorted([self.asn1.pk, self.asn2.pk])) + self.assertEqual(rows[1].postchange_data["asns"], [self.asn1.pk]) + + def test_enriched_membership_matches_upstream(self): + """After enrichment, m2m membership matches what the upstream serializer records.""" + vanilla = change_log_buffer._original_serialize_object(self.site_a, exclude=[]) + row = self._row(self.site_a) + change_log_buffer._enrich_m2m([row]) + self.assertEqual(set(row.postchange_data["asns"]), set(vanilla["asns"])) + + def test_enrich_is_bulk_one_query_per_relation(self): + """Resolving N objects' relations costs one query per relation, not one per object.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + # Site has a single serialisable m2m (asns) -> exactly one query. + with self.assertNumQueries(1): + change_log_buffer._enrich_m2m(rows) + + +class EnrichTagsTestCase(TestCase): + """`_enrich_tags` fills the tag placeholder left by the fast serializer in bulk.""" + + def setUp(self): + """Two Sites with overlapping tags to exercise per-object grouping.""" + ObjectChange.objects.all().delete() + self.red = Tag.objects.create(name="red", slug="red") + self.blue = Tag.objects.create(name="blue", slug="blue") + self.site_a = Site.objects.create(name="Tag A", slug=f"tag-a-{uuid4().hex[:8]}") + self.site_b = Site.objects.create(name="Tag B", slug=f"tag-b-{uuid4().hex[:8]}") + self.site_a.tags.add(self.red, self.blue) + self.site_b.tags.add(self.red) + self.ct_id = ContentType.objects.get_for_model(Site).id + + def _row(self, site): + return ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(site, exclude=[]), + ) + + def test_enrich_fills_tags_per_object_sorted(self): + """Each row gets its own object's tag names, sorted.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + change_log_buffer._enrich_tags(rows) + self.assertEqual(rows[0].postchange_data["tags"], ["blue", "red"]) + self.assertEqual(rows[1].postchange_data["tags"], ["red"]) + + def test_enrich_tags_one_query_per_content_type(self): + """N taggable objects of one model cost a single tag query, not one per object.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + with self.assertNumQueries(1): + change_log_buffer._enrich_tags(rows) + + def test_enrich_tags_empty_for_untagged_object(self): + """A taggable object with no tags keeps the empty placeholder.""" + site_c = Site.objects.create(name="Tag C", slug=f"tag-c-{uuid4().hex[:8]}") + row = self._row(site_c) + change_log_buffer._enrich_tags([row]) + self.assertEqual(row.postchange_data["tags"], []) + + def test_enrich_tags_skips_rows_without_placeholder(self): + """Rows whose postchange_data has no `tags` key (non-taggable shape) are not given one.""" + row = ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=self.site_a.pk, + action="create", + postchange_data={"name": "no-tags-key"}, + ) + change_log_buffer._enrich_tags([row]) + self.assertNotIn("tags", row.postchange_data) + + +class SnapshotForApplyTestCase(TestCase): + """`snapshot_for_apply` captures prechange consistent with the buffered postchange.""" + + def setUp(self): + """A Site with an m2m relation (asns) and a tag.""" + ObjectChange.objects.all().delete() + self.site = Site.objects.create(name="Snap site", slug=f"snap-{uuid4().hex[:8]}") + self.site.tags.add(Tag.objects.create(name="green", slug="green")) + rir = RIR.objects.create(name="Snap RIR", slug=f"srir-{uuid4().hex[:8]}") + self.asn = ASN.objects.create(asn=65100, rir=rir) + self.site.asns.add(self.asn) + self.ct_id = ContentType.objects.get_for_model(Site).id + self._exclude = ["last_updated"] if get_config().CHANGELOG_SKIP_EMPTY_CHANGES else [] + + def _snapshot_with_buffer(self): + token = change_log_buffer._apply_change_buffer.set({}) + try: + change_log_buffer.snapshot_for_apply(self.site) + finally: + change_log_buffer._apply_change_buffer.reset(token) + return self.site._prechange_snapshot + + def test_buffer_active_captures_sorted_m2m_and_tags(self): + """With the buffer active, prechange records m2m + tags resolved now, sorted.""" + snap = self._snapshot_with_buffer() + self.assertEqual(snap["asns"], [self.asn.pk]) + self.assertEqual(snap["tags"], ["green"]) + + def test_prechange_matches_postchange_for_unchanged_object(self): + """Prechange and postchange are identical for an unmodified object (no spurious diff).""" + prechange = self._snapshot_with_buffer() + + # Build postchange exactly as the buffered flush would. + row = ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=self.site.pk, + action="update", + postchange_data=change_log_buffer._fast_serialize_object(self.site, exclude=self._exclude), + ) + change_log_buffer._enrich_m2m([row]) + change_log_buffer._enrich_tags([row]) + + self.assertEqual(prechange, row.postchange_data) + + def test_buffer_inactive_delegates_to_netbox_snapshot(self): + """With no buffer, prechange comes from NetBox's own snapshot (full serializer).""" + if hasattr(self.site, "_prechange_snapshot"): + del self.site._prechange_snapshot + change_log_buffer.snapshot_for_apply(self.site) + self.assertTrue(hasattr(self.site, "_prechange_snapshot")) + # The full serializer includes the m2m relation directly. + self.assertIn("asns", self.site._prechange_snapshot) diff --git a/netbox_diode_plugin/tests/v4.6.x/tests/test_api_bulk_plan_apply.py b/netbox_diode_plugin/tests/v4.6.x/tests/test_api_bulk_plan_apply.py index 0e9bd5c..e296e0f 100644 --- a/netbox_diode_plugin/tests/v4.6.x/tests/test_api_bulk_plan_apply.py +++ b/netbox_diode_plugin/tests/v4.6.x/tests/test_api_bulk_plan_apply.py @@ -5,8 +5,9 @@ import logging from types import SimpleNamespace from unittest import mock +from uuid import uuid4 -from dcim.models import Site +from dcim.models import MACAddress, Site from rest_framework import status from utilities.testing import APITestCase @@ -376,6 +377,106 @@ def test_unauthenticated_request_returns_403(self): response = self.client.post(self.url, data={"entities": []}, format="json") self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + # --- Cross-request plan race must not produce duplicate MACAddress --- + + def test_concurrent_plans_dedupe_macaddress_via_pre_save_match(self): + """ + Two requests planning the same MAC must apply to a single row. + + Two reconciler workers planning equivalent change_sets for the same + interface + MAC each see no existing MAC row and each plan a CREATE. + Two sequential ``/bulk-plan/`` calls model this exactly: each request + has its own request-scoped obj_cache, so plan B cannot see plan A's + pending CREATE. + + NetBox has no DB-level unique constraint on + (mac_address, assigned_object_type, assigned_object_id), so the + applier dedupes by routing dcim.macaddress through the find-first + CREATE path (matcher.requires_pre_save_match). Apply B's CREATE + therefore matches the row apply A just committed instead of + inserting a second one. + """ + suffix = uuid4().hex[:8] + mac = "00:00:00:00:00:42" + + plan_payload = { + "entities": [ + { + "id": f"race-{suffix}", + "object_type": "dcim.interface", + "entity": { + "interface": { + "name": f"eth0-{suffix}", + "type": "1000base-t", + "device": { + "name": f"dev-{suffix}", + "role": {"name": f"role-{suffix}"}, + "site": {"name": f"site-{suffix}"}, + "device_type": { + "manufacturer": {"name": f"mfr-{suffix}"}, + "model": f"dt-{suffix}", + }, + }, + "primary_mac_address": {"mac_address": mac}, + }, + }, + } + ] + } + + plan_url = "/netbox/api/plugins/diode/bulk-plan/" + apply_url = "/netbox/api/plugins/diode/bulk-apply/" + + plan_a = self.client.post( + plan_url, data=plan_payload, format="json", **self.authorization_header + ) + plan_b = self.client.post( + plan_url, data=plan_payload, format="json", **self.authorization_header + ) + self.assertEqual(plan_a.status_code, status.HTTP_200_OK, plan_a.json()) + self.assertEqual(plan_b.status_code, status.HTTP_200_OK, plan_b.json()) + + result_a = plan_a.json()["results"][0] + result_b = plan_b.json()["results"][0] + self.assertIsNone(result_a.get("errors"), result_a) + self.assertIsNone(result_b.get("errors"), result_b) + cs_a = result_a.get("change_set") + cs_b = result_b.get("change_set") + self.assertIsNotNone(cs_a, result_a) + self.assertIsNotNone(cs_b, result_b) + + def mac_creates(change_set): + return [ + c for c in change_set["changes"] + if c["object_type"] == "dcim.macaddress" and c["change_type"] == "create" + ] + + self.assertEqual(len(mac_creates(cs_a)), 1, cs_a) + self.assertEqual(len(mac_creates(cs_b)), 1, cs_b) + + apply_a = self.client.post( + apply_url, + data={"change_sets": [cs_a]}, + format="json", + **self.authorization_header, + ) + apply_b = self.client.post( + apply_url, + data={"change_sets": [cs_b]}, + format="json", + **self.authorization_header, + ) + self.assertEqual(apply_a.status_code, status.HTTP_200_OK, apply_a.json()) + self.assertEqual(apply_b.status_code, status.HTTP_200_OK, apply_b.json()) + + macs = MACAddress.objects.filter(mac_address=mac) + self.assertEqual( + macs.count(), + 1, + f"expected exactly one MAC row after dedup, got {macs.count()}: " + f"{list(macs.values('pk', 'mac_address', 'assigned_object_id'))}", + ) + def test_insufficient_scope_returns_403(self): """Token with only read scope cannot call bulk-plan-apply (requires write).""" read_only_user = SimpleNamespace( diff --git a/netbox_diode_plugin/tests/v4.6.x/tests/test_change_log_buffer.py b/netbox_diode_plugin/tests/v4.6.x/tests/test_change_log_buffer.py new file mode 100644 index 0000000..a8cc45c --- /dev/null +++ b/netbox_diode_plugin/tests/v4.6.x/tests/test_change_log_buffer.py @@ -0,0 +1,460 @@ +#!/usr/bin/env python +# Copyright 2026 NetBox Labs, Inc. +"""Diode NetBox Plugin - buffered_change_logging tests.""" + +import logging +from types import SimpleNamespace +from unittest import mock +from uuid import uuid4 + +from core.models import ObjectChange +from dcim.models import Site +from django.contrib.contenttypes.models import ContentType +from django.test import TestCase +from extras.models import Tag +from ipam.models import ASN, RIR +from netbox.config import get_config +from rest_framework import status +from utilities.testing import APITestCase + +from netbox_diode_plugin.api import change_log_buffer, views +from netbox_diode_plugin.api.authentication import DiodeOAuth2Authentication +from netbox_diode_plugin.plugin_config import get_diode_user + +logger = logging.getLogger(__name__) + + +class BufferedChangeLoggingApplyTestCase(APITestCase): + """End-to-end behaviour of `buffered_change_logging` via `/bulk-plan-apply/`.""" + + def setUp(self): + """Auth + clean ObjectChange table for predictable counts.""" + self.url = "/netbox/api/plugins/diode/bulk-plan-apply/" + self.authorization_header = {"HTTP_AUTHORIZATION": "Bearer mocked_oauth_token"} + self.diode_user = SimpleNamespace( + user=get_diode_user(), + token_scopes=["netbox:read", "netbox:write"], + token_data={"scope": "netbox:read netbox:write"}, + ) + self.introspect_patcher = mock.patch.object( + DiodeOAuth2Authentication, + "_introspect_token", + return_value=self.diode_user, + ) + self.introspect_patcher.start() + + ObjectChange.objects.all().delete() + + def tearDown(self): + """Stop the auth patcher.""" + self.introspect_patcher.stop() + super().tearDown() + + def _make_payload(self, suffix): + """Build a single-entity bulk-plan-apply payload that creates a fresh Site.""" + return { + "entities": [ + { + "id": f"entity-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Site {suffix}", "slug": f"site-{suffix}"}}, + } + ] + } + + def _site_change_count(self): + """Return the number of ObjectChange rows whose changed_object is a Site.""" + return ObjectChange.objects.filter( + changed_object_type__app_label="dcim", + changed_object_type__model="site", + ).count() + + # --- Setting OFF: buffer is a pass-through, upstream writes synchronously --- + + def test_setting_false_writes_objectchange_synchronously(self): + """With the setting off (default), the apply writes ObjectChange synchronously via upstream.""" + response = self.client.post( + self.url, + data=self._make_payload(uuid4().hex[:8]), + format="json", + **self.authorization_header, + ) + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Upstream synchronous path -> row already in DB at request return. + self.assertEqual(self._site_change_count(), 1) + + # --- Setting ON: buffer collects + flushes one bulk_create at commit --- + + def test_setting_true_flushes_objectchange_on_commit(self): + """With the setting on, the buffered ObjectChange is written by the on_commit flush.""" + suffix = uuid4().hex[:8] + # Django's TestCase wraps each test in a transaction that is + # rolled back at end-of-test, which means `transaction.on_commit` + # callbacks normally never fire. `captureOnCommitCallbacks` runs + # them explicitly on context exit so we can assert the flush. + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + + # Apply transaction committed -> on_commit fired -> exactly one row. + self.assertEqual(self._site_change_count(), 1) + oc = ObjectChange.objects.get( + changed_object_type__app_label="dcim", changed_object_type__model="site" + ) + self.assertEqual(oc.action, "create") + self.assertEqual(oc.object_repr, f"Site {suffix}") + + # --- Rollback: no flush on apply failure --- + + def test_rollback_does_not_flush(self): + """If apply_changeset raises, transaction.on_commit never fires and nothing is written.""" + suffix = uuid4().hex[:8] + + def failing_apply(change_set, request): + Site.objects.create(name=f"Doomed {suffix}", slug=f"doomed-{suffix}") + raise RuntimeError("forced rollback") + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object(views, "apply_changeset", side_effect=failing_apply), \ + mock.patch.object(change_log_buffer, "_flush_objectchanges") as mock_flush, \ + self.captureOnCommitCallbacks(execute=True): + self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + + # Outer atomic rolled back the Site; the append on_commit was discarded. + self.assertFalse(Site.objects.filter(slug=f"doomed-{suffix}").exists()) + # Flush still runs at request end but the batch is empty. + if mock_flush.called: + self.assertEqual(list(mock_flush.call_args.args[0]), []) + + # --- Bypass wins when both flags are enabled --- + + def test_bypass_takes_precedence_over_buffer_when_both_active(self): + """When apply_bypass_change_logging is active, the buffer never collects and nothing is written.""" + suffix = uuid4().hex[:8] + + bypass_token = change_log_buffer._bypass_active.set(True) + try: + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=self._make_payload(suffix), format="json", **self.authorization_header + ) + finally: + change_log_buffer._bypass_active.reset(bypass_token) + + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Site was created (apply itself wasn't bypassed), but no ObjectChange. + self.assertTrue(Site.objects.filter(slug=f"site-{suffix}").exists()) + self.assertEqual(self._site_change_count(), 0) + + # --- Request-level batching: many entities -> one consolidated flush --- + + def test_multi_entity_request_flushes_one_batch(self): + """3 entities in one request -> ONE flush carrying all 3 entities' rows.""" + suffix = uuid4().hex[:8] + payload = { + "entities": [ + { + "id": f"entity-{i}-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Site {i} {suffix}", "slug": f"site-{i}-{suffix}"}}, + } + for i in range(3) + ] + } + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object( + change_log_buffer, "_flush_objectchanges", + side_effect=change_log_buffer._flush_objectchanges, + ) as spy_flush, \ + self.captureOnCommitCallbacks(execute=True): + response = self.client.post( + self.url, data=payload, format="json", **self.authorization_header + ) + + self.assertEqual(response.status_code, status.HTTP_200_OK, response.json()) + # Single flush for the whole request, not one per entity. + spy_flush.assert_called_once() + flushed = list(spy_flush.call_args.args[0]) + self.assertEqual(len(flushed), 3) + reprs = {oc.object_repr for oc in flushed} + self.assertEqual(reprs, {f"Site 0 {suffix}", f"Site 1 {suffix}", f"Site 2 {suffix}"}) + self.assertEqual(self._site_change_count(), 3) + + def test_failed_entity_excluded_from_batch(self): + """When one entity fails, its rows are dropped from the consolidated flush.""" + suffix = uuid4().hex[:8] + good_slug = f"good-{suffix}" + bad_slug = f"bad-{suffix}" + payload = { + "entities": [ + { + "id": f"good-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Good {suffix}", "slug": good_slug}}, + }, + { + "id": f"bad-{suffix}", + "object_type": "dcim.site", + "entity": {"site": {"name": f"Bad {suffix}", "slug": bad_slug}}, + }, + ] + } + + original_apply = views.apply_changeset + call_count = {"n": 0} + + def selective_failing_apply(change_set, request): + call_count["n"] += 1 + if call_count["n"] == 1: + return original_apply(change_set, request) + Site.objects.create(name=f"Bad inner {suffix}", slug=bad_slug) + raise RuntimeError("forced rollback for second entity") + + with mock.patch.object(change_log_buffer, "get_plugin_config", return_value=True), \ + mock.patch.object(views, "apply_changeset", side_effect=selective_failing_apply), \ + mock.patch.object( + change_log_buffer, "_flush_objectchanges", + side_effect=change_log_buffer._flush_objectchanges, + ) as spy_flush, \ + self.captureOnCommitCallbacks(execute=True): + self.client.post( + self.url, data=payload, format="json", **self.authorization_header + ) + + # Good entity made it through; bad entity rolled back. + self.assertTrue(Site.objects.filter(slug=good_slug).exists()) + self.assertFalse(Site.objects.filter(slug=bad_slug).exists()) + # Flush happens but only contains the good row. + spy_flush.assert_called_once() + flushed = list(spy_flush.call_args.args[0]) + self.assertEqual(len(flushed), 1) + self.assertEqual(flushed[0].object_repr, f"Good {suffix}") + + +class FastSerializeTestCase(TestCase): + """`_fast_serialize_object` parity and the buffer-active gate.""" + + def setUp(self): + """Create a Site with an m2m relation (asns) and a tag.""" + ObjectChange.objects.all().delete() + self.site = Site.objects.create(name="Parity site", slug=f"parity-{uuid4().hex[:8]}") + self.site.tags.add( + Tag.objects.create(name="alpha", slug="alpha"), + Tag.objects.create(name="beta", slug="beta"), + ) + rir = RIR.objects.create(name="Test RIR", slug=f"rir-{uuid4().hex[:8]}") + self.asn = ASN.objects.create(asn=65001, rir=rir) + self.site.asns.add(self.asn) + + def test_fast_serialize_omits_m2m_and_placeholders_tags(self): + """Fast output equals upstream minus m2m, with tags left as an empty placeholder.""" + vanilla = change_log_buffer._original_serialize_object(self.site, exclude=[]) + fast = change_log_buffer._fast_serialize_object(self.site, exclude=[]) + + # `asns` is the only serialisable m2m on Site; the fast path drops it. + m2m_names = {f.name for f in Site._meta.local_many_to_many if f.serialize} + self.assertIn("asns", m2m_names) + self.assertNotIn("asns", fast) + # Tags are a placeholder, resolved in bulk at flush, not per-save. + self.assertEqual(fast["tags"], []) + + # Every other field matches upstream exactly. + expected = {k: v for k, v in vanilla.items() if k not in m2m_names and k != "tags"} + actual = {k: v for k, v in fast.items() if k != "tags"} + self.assertEqual(actual, expected) + + def test_full_parity_after_enrichment(self): + """Fast serialize + m2m + tag enrichment reproduces the upstream serializer output.""" + vanilla = change_log_buffer._original_serialize_object(self.site, exclude=[]) + row = ObjectChange( + changed_object_type_id=ContentType.objects.get_for_model(Site).id, + changed_object_id=self.site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(self.site, exclude=[]), + ) + change_log_buffer._enrich_m2m([row]) + change_log_buffer._enrich_tags([row]) + + # m2m ordering: enrichment sorts, upstream uses queryset order; + # compare membership for the relation, then the rest exactly. + self.assertEqual(set(row.postchange_data.pop("asns")), set(vanilla.pop("asns"))) + self.assertEqual(row.postchange_data, vanilla) + + def test_serialize_object_gate_is_inactive_without_buffer(self): + """With no buffer active, serialize_object delegates to the upstream implementation.""" + # Identical output to the captured original means the gate did not + # engage the fast path. + self.assertEqual( + self.site.serialize_object(), + change_log_buffer._original_serialize_object(self.site), + ) + + def test_serialize_object_gate_engages_fast_path_with_buffer(self): + """With a buffer active, serialize_object routes through the fast path (no m2m).""" + token = change_log_buffer._apply_change_buffer.set({}) + try: + gated = self.site.serialize_object() + finally: + change_log_buffer._apply_change_buffer.reset(token) + self.assertNotIn("asns", gated) + + +class EnrichM2MTestCase(TestCase): + """`_enrich_m2m` re-adds m2m relations to buffered rows in bulk.""" + + def setUp(self): + """Two Sites, each with distinct ASNs, to exercise per-object grouping.""" + ObjectChange.objects.all().delete() + rir = RIR.objects.create(name="Enrich RIR", slug=f"erir-{uuid4().hex[:8]}") + self.asn1 = ASN.objects.create(asn=65010, rir=rir) + self.asn2 = ASN.objects.create(asn=65011, rir=rir) + self.site_a = Site.objects.create(name="Enrich A", slug=f"enrich-a-{uuid4().hex[:8]}") + self.site_b = Site.objects.create(name="Enrich B", slug=f"enrich-b-{uuid4().hex[:8]}") + self.site_a.asns.add(self.asn1, self.asn2) + self.site_b.asns.add(self.asn1) + self.ct_id = ContentType.objects.get_for_model(Site).id + + def _row(self, site): + return ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(site, exclude=[]), + ) + + def test_enrich_populates_m2m_per_object(self): + """Each row's postchange_data gets its own object's relation, sorted.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + change_log_buffer._enrich_m2m(rows) + + self.assertEqual(rows[0].postchange_data["asns"], sorted([self.asn1.pk, self.asn2.pk])) + self.assertEqual(rows[1].postchange_data["asns"], [self.asn1.pk]) + + def test_enriched_membership_matches_upstream(self): + """After enrichment, m2m membership matches what the upstream serializer records.""" + vanilla = change_log_buffer._original_serialize_object(self.site_a, exclude=[]) + row = self._row(self.site_a) + change_log_buffer._enrich_m2m([row]) + self.assertEqual(set(row.postchange_data["asns"]), set(vanilla["asns"])) + + def test_enrich_is_bulk_one_query_per_relation(self): + """Resolving N objects' relations costs one query per relation, not one per object.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + # Site has a single serialisable m2m (asns) -> exactly one query. + with self.assertNumQueries(1): + change_log_buffer._enrich_m2m(rows) + + +class EnrichTagsTestCase(TestCase): + """`_enrich_tags` fills the tag placeholder left by the fast serializer in bulk.""" + + def setUp(self): + """Two Sites with overlapping tags to exercise per-object grouping.""" + ObjectChange.objects.all().delete() + self.red = Tag.objects.create(name="red", slug="red") + self.blue = Tag.objects.create(name="blue", slug="blue") + self.site_a = Site.objects.create(name="Tag A", slug=f"tag-a-{uuid4().hex[:8]}") + self.site_b = Site.objects.create(name="Tag B", slug=f"tag-b-{uuid4().hex[:8]}") + self.site_a.tags.add(self.red, self.blue) + self.site_b.tags.add(self.red) + self.ct_id = ContentType.objects.get_for_model(Site).id + + def _row(self, site): + return ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=site.pk, + action="create", + postchange_data=change_log_buffer._fast_serialize_object(site, exclude=[]), + ) + + def test_enrich_fills_tags_per_object_sorted(self): + """Each row gets its own object's tag names, sorted.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + change_log_buffer._enrich_tags(rows) + self.assertEqual(rows[0].postchange_data["tags"], ["blue", "red"]) + self.assertEqual(rows[1].postchange_data["tags"], ["red"]) + + def test_enrich_tags_one_query_per_content_type(self): + """N taggable objects of one model cost a single tag query, not one per object.""" + rows = [self._row(self.site_a), self._row(self.site_b)] + with self.assertNumQueries(1): + change_log_buffer._enrich_tags(rows) + + def test_enrich_tags_empty_for_untagged_object(self): + """A taggable object with no tags keeps the empty placeholder.""" + site_c = Site.objects.create(name="Tag C", slug=f"tag-c-{uuid4().hex[:8]}") + row = self._row(site_c) + change_log_buffer._enrich_tags([row]) + self.assertEqual(row.postchange_data["tags"], []) + + def test_enrich_tags_skips_rows_without_placeholder(self): + """Rows whose postchange_data has no `tags` key (non-taggable shape) are not given one.""" + row = ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=self.site_a.pk, + action="create", + postchange_data={"name": "no-tags-key"}, + ) + change_log_buffer._enrich_tags([row]) + self.assertNotIn("tags", row.postchange_data) + + +class SnapshotForApplyTestCase(TestCase): + """`snapshot_for_apply` captures prechange consistent with the buffered postchange.""" + + def setUp(self): + """A Site with an m2m relation (asns) and a tag.""" + ObjectChange.objects.all().delete() + self.site = Site.objects.create(name="Snap site", slug=f"snap-{uuid4().hex[:8]}") + self.site.tags.add(Tag.objects.create(name="green", slug="green")) + rir = RIR.objects.create(name="Snap RIR", slug=f"srir-{uuid4().hex[:8]}") + self.asn = ASN.objects.create(asn=65100, rir=rir) + self.site.asns.add(self.asn) + self.ct_id = ContentType.objects.get_for_model(Site).id + self._exclude = ["last_updated"] if get_config().CHANGELOG_SKIP_EMPTY_CHANGES else [] + + def _snapshot_with_buffer(self): + token = change_log_buffer._apply_change_buffer.set({}) + try: + change_log_buffer.snapshot_for_apply(self.site) + finally: + change_log_buffer._apply_change_buffer.reset(token) + return self.site._prechange_snapshot + + def test_buffer_active_captures_sorted_m2m_and_tags(self): + """With the buffer active, prechange records m2m + tags resolved now, sorted.""" + snap = self._snapshot_with_buffer() + self.assertEqual(snap["asns"], [self.asn.pk]) + self.assertEqual(snap["tags"], ["green"]) + + def test_prechange_matches_postchange_for_unchanged_object(self): + """Prechange and postchange are identical for an unmodified object (no spurious diff).""" + prechange = self._snapshot_with_buffer() + + # Build postchange exactly as the buffered flush would. + row = ObjectChange( + changed_object_type_id=self.ct_id, + changed_object_id=self.site.pk, + action="update", + postchange_data=change_log_buffer._fast_serialize_object(self.site, exclude=self._exclude), + ) + change_log_buffer._enrich_m2m([row]) + change_log_buffer._enrich_tags([row]) + + self.assertEqual(prechange, row.postchange_data) + + def test_buffer_inactive_delegates_to_netbox_snapshot(self): + """With no buffer, prechange comes from NetBox's own snapshot (full serializer).""" + if hasattr(self.site, "_prechange_snapshot"): + del self.site._prechange_snapshot + change_log_buffer.snapshot_for_apply(self.site) + self.assertTrue(hasattr(self.site, "_prechange_snapshot")) + # The full serializer includes the m2m relation directly. + self.assertIn("asns", self.site._prechange_snapshot)