From 4fc5e9f31871fa50c0c7f07d82bbb18f84cdd276 Mon Sep 17 00:00:00 2001 From: earayu Date: Mon, 27 Apr 2026 19:30:33 +0800 Subject: [PATCH 1/2] feat(celery Wave 6 #34): chunk_id schema canonical unification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per docs/modularization/indexing-redesign-design-pack.md §K.11.1 #34 (huangheng T1 obs B + Wave 5 P5B chunk_id 5th item deferred). ## Changes aperag/indexing/vector.py - VectorBackend.upsert_point(chunk_id=) → upsert_point(point_id=) (canonical Qdrant naming, aligned with summary/vision protocols) - InMemoryVectorBackend record key renamed chunk_id → point_id - Vector worker callsite passes point_id=chunk["chunk_id"]; chunk_id remains in payload for hybrid-dedup with fulltext (§C.6 trade-off lock) aperag/indexing/worker_factory.py - _QdrantPointBackend.upsert_point(): single point_id keyword (drop pre-Wave-6 dual chunk_id|point_id transition shim) - Drop merged_payload.setdefault("chunk_id", identifier) — adapter no longer injects misleading chunk_id into summary/vision payloads (their points are not chunks); each modality controls its payload aperag/indexing/reconciler.py - utc_now audit comment: utc_now usage in reconciler is application-level wall-clock for lease comparison + gmt_updated stamps, distinct from Wave 5 P5B server_default=CURRENT_TIMESTAMP ORM-creation defaults. No further migration needed. tests/unit_test/indexing/test_chunk_id_schema_canonical.py (new) - 7 contract tests pin canonical naming: * VectorBackend / SummaryBackend / VisionBackend Protocols use point_id * InMemoryVectorBackend round-trips point_id at record level + chunk_id at payload level * Legacy chunk_id= keyword raises TypeError on InMemoryVectorBackend * _QdrantPointBackend.upsert_point uses single point_id param * _QdrantPointBackend does not inject chunk_id into payload * Parser chunks.jsonl chunk_id field naming preserved tests/integration/test_cleanup_fan_out.py tests/unit_test/indexing/test_t1_3_vector_fulltext.py tests/unit_test/indexing/test_t2_1_runtime.py tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py - backend.upsert_point(chunk_id=...) → point_id=... - record reads p["chunk_id"] → p["point_id"] / p["payload"]["chunk_id"] ## Production-readiness 三类 layer (per §K.11.4) - must-be-real: parser layer chunk_id field schema unified across vector/summary/vision backend protocol surfaces; remaining utc_now usage audited (reconciler.py only, application-level, distinct from ORM defaults Wave 5 P5B already migrated) - may-be-gated: vector worker still keeps chunk_id in payload for hybrid-dedup with fulltext (§C.6 contract preserved) - fully-resolves: huangheng T1 obs B + Wave 5 P5B chunk_id 5th item deferred (per spec §K.11.1 row 34) ## simple-stable 4 guardrail (per feedback_simple_stable_zero_maintenance.md) 1. 不无限扩范围 ✅ — scope limited to vector protocol + adapter, no cross-cutting touch 2. 功能做实 ✅ — real schema rename, no transitional shim left behind 3. 简单稳定 ✅ — drops dual-API polymorphism + drops hidden payload side-effect (setdefault chunk_id), each layer's contract is now self-evident from signature 4. 私有化免维护 ✅ — operator/dev reading code sees canonical naming without needing to track which arg name belongs to which modality ## hard-cut policy (per earayu2 msg=30c81478) No production data → schema breaking change applied directly. No backward-compat shim, no deprecation window. Test that the legacy chunk_id= keyword raises TypeError catches accidental regression. ## Pre-check (per K.11.5 Pattern 2) grep upsert_point across aperag/indexing/ + tests/ — all callsites audited, all callers migrated, contract test pins canonical name. ## Local gates - 152/152 indexing unit tests pass - 21/21 indexing integration tests pass (cleanup_fan_out + dispatch_with_parse + inline_mode_smoke + parse_async_roundtrip) - 7/7 new contract tests pass - ruff check + ruff format clean Co-Authored-By: Claude Opus 4.7 --- aperag/indexing/reconciler.py | 14 +- aperag/indexing/vector.py | 31 +-- aperag/indexing/worker_factory.py | 43 ++-- tests/integration/test_cleanup_fan_out.py | 12 +- .../test_chunk_id_schema_canonical.py | 184 ++++++++++++++++++ .../indexing/test_t1_3_vector_fulltext.py | 4 +- tests/unit_test/indexing/test_t2_1_runtime.py | 8 +- .../indexing/test_t3_1_dispatcher_path_c.py | 6 +- 8 files changed, 252 insertions(+), 50 deletions(-) create mode 100644 tests/unit_test/indexing/test_chunk_id_schema_canonical.py diff --git a/aperag/indexing/reconciler.py b/aperag/indexing/reconciler.py index 9727b3c92..2702e01bd 100644 --- a/aperag/indexing/reconciler.py +++ b/aperag/indexing/reconciler.py @@ -325,7 +325,19 @@ async def reconcile_collection_summaries_hook( ) def _reclaim_stale_and_claim_pending() -> list[tuple[str, str, int, str]]: - """Sync DB-only worker. Returns list of claimed dispatch tuples.""" + """Sync DB-only worker. Returns list of claimed dispatch tuples. + + Note (Wave 6 #34 utc_now audit): the ``utc_now`` calls in this + function are application-level wall-clock reads used to compute + lease expiry windows and stamp ``gmt_last_reconciled`` / + ``gmt_updated`` for in-flight rows. They are intentionally + Python-side and distinct from the ORM-default + ``server_default=CURRENT_TIMESTAMP`` migration done in Wave 5 + P5B for ``_LineageEntityRow`` / ``_LineageRelationRow``: those + cover row-creation defaults; reconciler updates need a wall- + clock value materialised inside the worker process for lease + comparison logic, so server-side defaults do not apply here. + """ from aperag.utils.utils import utc_now as _utc_now claimed_dispatches: list[tuple[str, str, int, str]] = [] diff --git a/aperag/indexing/vector.py b/aperag/indexing/vector.py index fcd8a9a27..d30e94b85 100644 --- a/aperag/indexing/vector.py +++ b/aperag/indexing/vector.py @@ -75,17 +75,24 @@ def delete_by_filter(self, *, document_id: str, parse_version: str) -> int: def upsert_point( self, *, - chunk_id: str, + point_id: str, embedding: list[float], payload: dict[str, Any], ) -> None: - """Idempotent point insert keyed on ``chunk_id`` (Qdrant point id).""" + """Idempotent point insert keyed on ``point_id`` (Qdrant point id). + + For the vector modality the caller passes the parser-emitted + ``chunk_id`` as ``point_id`` (one chunk → one point). The + ``chunk_id`` value is also written into ``payload["chunk_id"]`` + by the worker so retrieval can echo it back; vector + fulltext + share that payload field for hybrid-dedup (§C.6). + """ class InMemoryVectorBackend: """Process-local in-memory backend for unit tests. - Stores points in a dict keyed by ``chunk_id``. Implements the + Stores points in a dict keyed by ``point_id``. Implements the :class:`VectorBackend` protocol so vector.sync can target it transparently. """ @@ -95,22 +102,22 @@ def __init__(self) -> None: def delete_by_filter(self, *, document_id: str, parse_version: str) -> int: deleted = 0 - for chunk_id in list(self._points): - payload = self._points[chunk_id].get("payload", {}) + for point_id in list(self._points): + payload = self._points[point_id].get("payload", {}) if payload.get("document_id") == document_id and payload.get("parse_version") == parse_version: - self._points.pop(chunk_id) + self._points.pop(point_id) deleted += 1 return deleted def upsert_point( self, *, - chunk_id: str, + point_id: str, embedding: list[float], payload: dict[str, Any], ) -> None: - self._points[chunk_id] = { - "chunk_id": chunk_id, + self._points[point_id] = { + "point_id": point_id, "embedding": list(embedding), "payload": dict(payload), } @@ -126,10 +133,10 @@ def points_for_document(self, document_id: str, parse_version: str | None = None if parse_version is not None and payload.get("parse_version") != parse_version: continue out.append(record) - return sorted(out, key=lambda r: r["chunk_id"]) + return sorted(out, key=lambda r: r["point_id"]) def all_points(self) -> list[dict[str, Any]]: - return sorted(self._points.values(), key=lambda r: r["chunk_id"]) + return sorted(self._points.values(), key=lambda r: r["point_id"]) def _placeholder_embedding(text: str, dim: int = SIMULATOR_EMBEDDING_DIM) -> list[float]: @@ -229,7 +236,7 @@ async def sync( "page_idx": chunk.get("page_idx"), } self._backend.upsert_point( - chunk_id=chunk_id, + point_id=chunk_id, embedding=embedding, payload=payload, ) diff --git a/aperag/indexing/worker_factory.py b/aperag/indexing/worker_factory.py index b7378f44c..c994dd24c 100644 --- a/aperag/indexing/worker_factory.py +++ b/aperag/indexing/worker_factory.py @@ -94,10 +94,16 @@ class _QdrantPointBackend: vision modalities consume. All three modalities share the same Qdrant-shaped surface (delete - by ``(document_id, parse_version)`` filter, upsert by - ``chunk_id``/``point_id``). One adapter class satisfies the three - Protocols structurally — no inheritance needed because the - Protocols are ``@runtime_checkable``. + by ``(document_id, parse_version)`` filter, upsert by ``point_id``). + One adapter class satisfies the three Protocols structurally — no + inheritance needed because the Protocols are ``@runtime_checkable``. + + The ``point_id`` is the canonical name for the Qdrant point + identifier across all three modalities (per Wave 6 #34 schema + unification). Vector workers pass the parser-emitted ``chunk_id`` + here; summary uses ``summary::``; vision uses + ``vision:::``. Each worker controls its own + payload — the adapter does not inject anything. """ def __init__(self, *, connector: Any) -> None: @@ -120,43 +126,34 @@ def delete_by_filter(self, *, document_id: str, parse_version: str) -> int: def upsert_point( self, *, - chunk_id: str | None = None, - point_id: str | None = None, + point_id: str, embedding: list[float], payload: dict[str, Any], ) -> None: - # Vector modality calls with ``chunk_id``; summary / vision - # modalities call with ``point_id``. Both end up as the - # underlying Qdrant point id. - # # Qdrant only accepts unsigned-integer or UUID point ids. The # T1.1 parser produces chunk ids of the form # ``:`` (e.g. ``f766a946575ec3b4:0000``) # which Qdrant rejects with HTTP 400 "is not a valid point # ID". Map the caller-supplied string id into a deterministic # UUID5 so retries land on the same point and the upsert is - # idempotent — and stash the original id in the payload so - # the read path can still surface it to clients. + # idempotent. The caller's payload is forwarded verbatim — it + # already carries whatever modality-specific identifier the + # read path needs (vector keeps ``chunk_id`` in payload for + # hybrid-dedup; summary uses ``summary_text``; vision uses + # ``image_id``). import uuid from aperag.vectorstore.dto import VectorPoint - identifier = chunk_id if chunk_id is not None else point_id - if not identifier: - raise ValueError("upsert_point requires either chunk_id or point_id") - identifier = str(identifier) - qdrant_id = str(uuid.uuid5(uuid.NAMESPACE_OID, identifier)) - merged_payload = dict(payload) - # Preserve the original id under a stable key so the read - # path can echo it back; ``chunk_id`` is what vector modality - # already writes so we don't overwrite it. - merged_payload.setdefault("chunk_id", identifier) + if not point_id: + raise ValueError("upsert_point requires point_id") + qdrant_id = str(uuid.uuid5(uuid.NAMESPACE_OID, str(point_id))) self._connector.upsert( [ VectorPoint( id=qdrant_id, vector=list(embedding), - payload=merged_payload, + payload=dict(payload), ) ] ) diff --git a/tests/integration/test_cleanup_fan_out.py b/tests/integration/test_cleanup_fan_out.py index 420757146..60842e5c2 100644 --- a/tests/integration/test_cleanup_fan_out.py +++ b/tests/integration/test_cleanup_fan_out.py @@ -136,7 +136,7 @@ async def factory_closure(row: DocumentIndex): pv_a = "doc-del-a-pv0001"[:16] _insert_row(engine, document_id="doc-del", parse_version=pv_a, modality=Modality.VECTOR) factory_backend.upsert_point( - chunk_id="chunk-fac", + point_id="chunk-fac", embedding=[0.0] * 16, payload={ "document_id": "doc-del", @@ -150,7 +150,7 @@ async def factory_closure(row: DocumentIndex): }, ) fallback_backend.upsert_point( - chunk_id="chunk-fallback", + point_id="chunk-fallback", embedding=[0.0] * 16, payload={ "document_id": "doc-del", @@ -245,7 +245,7 @@ async def factory_closure(row: DocumentIndex): backend = backends[(col, modality)] if modality is Modality.VECTOR: backend.upsert_point( - chunk_id=chunk_id, + point_id=chunk_id, embedding=[0.0] * 16, payload={ "document_id": document_id, @@ -360,7 +360,7 @@ def test_orphan_parse_version_gc_uses_worker_factory(engine): _set_updated_at(engine, old_id, _utcnow() - timedelta(hours=2)) backend.upsert_point( - chunk_id="chunk-orphan", + point_id="chunk-orphan", embedding=[0.0] * 16, payload={ "document_id": "doc-orphan", @@ -415,7 +415,7 @@ def test_collection_deletion_cascade_uses_worker_factory(engine): collection_id="col-doomed", ) backend.upsert_point( - chunk_id="chunk-col", + point_id="chunk-col", embedding=[0.0] * 16, payload={ "document_id": "doc-col", @@ -489,7 +489,7 @@ def test_workers_map_only_path_unchanged_for_existing_callers(engine): pv = "compatparsever01"[:16] _insert_row(engine, document_id="doc-compat", parse_version=pv, modality=Modality.VECTOR) backend.upsert_point( - chunk_id="chunk-compat", + point_id="chunk-compat", embedding=[0.0] * 16, payload={ "document_id": "doc-compat", diff --git a/tests/unit_test/indexing/test_chunk_id_schema_canonical.py b/tests/unit_test/indexing/test_chunk_id_schema_canonical.py new file mode 100644 index 000000000..5f9fd880d --- /dev/null +++ b/tests/unit_test/indexing/test_chunk_id_schema_canonical.py @@ -0,0 +1,184 @@ +# Copyright 2025 ApeCloud, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Wave 6 #34 — chunk_id schema canonical unification contract tests. + +Pins the single canonical naming for the Qdrant point identifier +across the three modalities that share the +``{delete_by_filter, upsert_point}`` backend protocol (vector / +summary / vision): + +- The ``upsert_point`` keyword arg is ``point_id`` (singular canonical + name). The pre-Wave-6 dual API (``chunk_id | point_id``) was a + transition shim; vector callers now pass ``point_id=chunk["chunk_id"]`` + while summary/vision pass composite ids + (``summary::`` / ``vision:::``). +- The ``chunk_id`` field stays on the **payload** for vector — that is + what hybrid retrieval reads to dedup against fulltext (per §C.6). + Summary/vision payloads do **not** carry ``chunk_id`` — their points + are not chunks. +- The shared adapter (``_QdrantPointBackend``) does **not** inject + ``chunk_id`` into payload anymore; each modality controls its own + payload schema. +""" + +from __future__ import annotations + +import inspect +import json + +import pytest + +from aperag.indexing.summary import SummaryBackend +from aperag.indexing.vector import InMemoryVectorBackend, VectorBackend +from aperag.indexing.vision import VisionBackend + + +def test_vector_backend_protocol_uses_point_id_keyword(): + """The :class:`VectorBackend` ``upsert_point`` accepts ``point_id``, + not the pre-Wave-6 ``chunk_id`` keyword. This pins the canonical + name for vector at the Protocol surface.""" + sig = inspect.signature(VectorBackend.upsert_point) + params = sig.parameters + assert "point_id" in params, "VectorBackend.upsert_point must accept point_id" + assert "chunk_id" not in params, "VectorBackend.upsert_point must NOT accept chunk_id (Wave 6 #34 canonical rename)" + + +def test_summary_and_vision_backend_protocols_use_point_id_keyword(): + """Summary + vision already used ``point_id`` pre-Wave-6; pin that + they remain on the canonical name (cross-modality alignment).""" + for backend_cls in (SummaryBackend, VisionBackend): + sig = inspect.signature(backend_cls.upsert_point) + params = sig.parameters + assert "point_id" in params, f"{backend_cls.__name__}.upsert_point must accept point_id" + assert "chunk_id" not in params, f"{backend_cls.__name__}.upsert_point must NOT accept chunk_id" + + +def test_in_memory_vector_backend_round_trips_point_id_and_payload_chunk_id(): + """Vector worker passes parser ``chunk_id`` as ``point_id`` while + keeping ``chunk_id`` in payload for hybrid-dedup. Pin both the + record key and the payload field.""" + backend = InMemoryVectorBackend() + backend.upsert_point( + point_id="abc:0001", + embedding=[0.0] * 4, + payload={ + "document_id": "d1", + "parse_version": "v1", + "modality": "vector", + "chunk_id": "abc:0001", + }, + ) + points = backend.points_for_document("d1", "v1") + assert len(points) == 1 + record = points[0] + assert record["point_id"] == "abc:0001" + assert record["payload"]["chunk_id"] == "abc:0001", ( + "vector worker must keep chunk_id in payload for hybrid dedup with fulltext (§C.6)" + ) + + +def test_in_memory_vector_backend_rejects_legacy_chunk_id_keyword(): + """Pin that the legacy ``chunk_id=`` keyword is gone from the + Wave 6-canonical API — callers must pass ``point_id=``. This + catches accidental regression to the pre-Wave-6 dual API.""" + backend = InMemoryVectorBackend() + with pytest.raises(TypeError): + backend.upsert_point( + chunk_id="abc:0001", # type: ignore[call-arg] + embedding=[0.0] * 4, + payload={"document_id": "d1", "parse_version": "v1"}, + ) + + +def test_qdrant_point_backend_adapter_uses_single_point_id_keyword(): + """The shared production adapter (``_QdrantPointBackend``) now + exposes a single ``point_id`` parameter. Pre-Wave-6 the adapter + accepted ``chunk_id | point_id`` polymorphically as a transition + shim; that polymorphism is removed.""" + from aperag.indexing.worker_factory import _QdrantPointBackend + + sig = inspect.signature(_QdrantPointBackend.upsert_point) + params = sig.parameters + assert "point_id" in params + assert "chunk_id" not in params + + +def test_qdrant_point_backend_adapter_does_not_inject_chunk_id_into_payload(): + """The pre-Wave-6 adapter would ``setdefault("chunk_id", identifier)`` + into the caller's payload — leaking a misleading ``chunk_id`` field + into summary / vision payloads (their points are not chunks). Pin + that the Wave 6 adapter forwards the caller's payload verbatim and + does not synthesise any field.""" + from aperag.indexing.worker_factory import _QdrantPointBackend + + captured: dict = {} + + class _StubConnector: + def upsert(self, points): + # Capture the payload that the adapter forwards to the + # underlying connector so we can assert the adapter did + # not inject anything. + captured["points"] = list(points) + + def delete_by_filter(self, flt): # noqa: D401 + pass + + backend = _QdrantPointBackend(connector=_StubConnector()) + backend.upsert_point( + point_id="summary:d1:v1", + embedding=[0.0] * 4, + payload={ + "document_id": "d1", + "parse_version": "v1", + "modality": "summary", + "summary_text": "x", + }, + ) + forwarded_payload = captured["points"][0].payload + assert "chunk_id" not in forwarded_payload, ( + "adapter must not inject chunk_id into summary payload — summary points are not chunks (Wave 6 #34 cleanup)" + ) + # Caller payload preserved verbatim. + assert forwarded_payload["modality"] == "summary" + assert forwarded_payload["summary_text"] == "x" + + +def test_parser_chunks_jsonl_field_is_chunk_id(): + """The parser-emitted chunks.jsonl schema is the source of truth + for vector + fulltext: every chunk record carries a ``chunk_id`` + field. Pin the field name on a synthetic chunks.jsonl payload — + the canonical naming is ``chunk_id`` at the chunk-record level + (one chunk → one Qdrant point), and that record's ``chunk_id`` + is what the worker passes as ``point_id`` at the backend layer.""" + chunks = [ + { + "chunk_id": "h:0000", + "text": "alpha", + "section_path": None, + "heading_anchor": None, + "page_idx": None, + }, + { + "chunk_id": "h:0001", + "text": "bravo", + "section_path": None, + "heading_anchor": None, + "page_idx": None, + }, + ] + serialised = "\n".join(json.dumps(c) for c in chunks).encode("utf-8") + decoded = [json.loads(line) for line in serialised.decode("utf-8").splitlines() if line] + assert all("chunk_id" in c for c in decoded) + assert {c["chunk_id"] for c in decoded} == {"h:0000", "h:0001"} diff --git a/tests/unit_test/indexing/test_t1_3_vector_fulltext.py b/tests/unit_test/indexing/test_t1_3_vector_fulltext.py index baa1dd5b3..ccf3d054a 100644 --- a/tests/unit_test/indexing/test_t1_3_vector_fulltext.py +++ b/tests/unit_test/indexing/test_t1_3_vector_fulltext.py @@ -210,7 +210,9 @@ def test_vector_and_fulltext_share_chunk_ids_for_hybrid_dedup(): ) ) - vec_chunk_ids = {p["chunk_id"] for p in vec_backend.points_for_document("doc-beacon", parsed.parse_version)} + vec_chunk_ids = { + p["payload"]["chunk_id"] for p in vec_backend.points_for_document("doc-beacon", parsed.parse_version) + } ft_chunk_ids = {d["chunk_id"] for d in ft_backend.documents_for_document("doc-beacon", parsed.parse_version)} assert vec_chunk_ids == ft_chunk_ids, ( diff --git a/tests/unit_test/indexing/test_t2_1_runtime.py b/tests/unit_test/indexing/test_t2_1_runtime.py index a0d3d49e4..ae02ddfc0 100644 --- a/tests/unit_test/indexing/test_t2_1_runtime.py +++ b/tests/unit_test/indexing/test_t2_1_runtime.py @@ -665,7 +665,7 @@ def test_cleanup_deletes_superseded_parse_version_after_cooldown(engine): # Pre-populate the backend so we can assert the cleanup deletes. backend.upsert_point( - chunk_id="chunk-old", + point_id="chunk-old", embedding=[0.1] * 16, payload={ "document_id": "doc-1", @@ -679,7 +679,7 @@ def test_cleanup_deletes_superseded_parse_version_after_cooldown(engine): }, ) backend.upsert_point( - chunk_id="chunk-new", + point_id="chunk-new", embedding=[0.2] * 16, payload={ "document_id": "doc-1", @@ -704,7 +704,7 @@ def test_cleanup_deletes_superseded_parse_version_after_cooldown(engine): # Old backend entry is gone; new entry survives. surviving = backend.points_for_document("doc-1") - surviving_chunk_ids = {p["chunk_id"] for p in surviving} + surviving_chunk_ids = {p["point_id"] for p in surviving} assert surviving_chunk_ids == {"chunk-new"} # Old DB row is gone; new row still there. @@ -827,7 +827,7 @@ def test_cleanup_for_deleted_documents_removes_non_graph_backend_per_parse_versi ) for pv, chunk_id in ((pv_a, "chunk-a"), (pv_b, "chunk-b")): backend.upsert_point( - chunk_id=chunk_id, + point_id=chunk_id, embedding=[0.0] * 16, payload={ "document_id": "doc-del", diff --git a/tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py b/tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py index 53c4f1c13..79a5d6cbd 100644 --- a/tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py +++ b/tests/unit_test/indexing/test_t3_1_dispatcher_path_c.py @@ -316,7 +316,7 @@ def test_path_c_cascades_via_path_b_and_sweeps_collection_rows(engine): ("chunk-O", "docOther", "poooooooooooooo1"), ): backend.upsert_point( - chunk_id=chunk_id, + point_id=chunk_id, embedding=[0.0] * 16, payload={ "document_id": doc, @@ -344,7 +344,7 @@ def test_path_c_cascades_via_path_b_and_sweeps_collection_rows(engine): assert counts["backend_skipped"] == 0 # Backend: only the other-collection chunk survives. - surviving_chunks = {p["chunk_id"] for p in backend.all_points()} + surviving_chunks = {p["point_id"] for p in backend.all_points()} assert surviving_chunks == {"chunk-O"} # DB: only the other-collection row survives. @@ -382,7 +382,7 @@ def test_path_c_idempotent_on_re_run(engine): engine, document_id="docZ", parse_version="pzzzzzzzzzzzzzz1", modality=Modality.VECTOR, collection_id="col-Z" ) backend.upsert_point( - chunk_id="chunk-Z", + point_id="chunk-Z", embedding=[0.0] * 16, payload={ "document_id": "docZ", From dcd4b588966913622ffe5285f41b1e43d6bd5619 Mon Sep 17 00:00:00 2001 From: earayu Date: Mon, 27 Apr 2026 19:35:34 +0800 Subject: [PATCH 2/2] feat(celery Wave 6 #38): application cache cross-loop lazy-rebuild + WARN log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per docs/modularization/indexing-redesign-design-pack.md §K.11.1 #38 (huangheng PR #1734 sync point 7 + feedback_announce_equals_landed.md narrative-truth invariant). ## Problem (pre-Wave-6 narrative-truth violation) `aperag.cache.application_runtime.get_application_cache()` keyed the cached `ApplicationCache` instance on the running asyncio event loop (the Redis async client is loop-bound). When the running loop changed (test process restart, worker reinit, asyncio re-initialisation, etc.), the function silently swapped the cache for a `NoopApplicationCacheBackend` with `enabled=False`. From that point on the worker paid the full LiteLLM / embedding round-trip on every request, with no log line, no metric, and no operator-visible signal that caching had degraded to zero. ## Fix When the loop-switch branch fires: - emit a WARN log with operator-actionable phrasing - bump `application_cache_metrics["application_runtime"]["loop_switch_rebuild"]` (uses the existing ApplicationCacheMetrics instance — no new metrics infra) - reset `_async_cache=None` and fall through to the normal initialisation path (which re-establishes the real Redis client on the new loop) The Noop fallback is now reached only when Redis is genuinely unreachable on the new loop — never as a silent loop-switch downgrade. ## Tests (4 new) tests/unit_test/cache/test_application_cache.py - test_application_cache_rebuilds_on_loop_switch_instead_of_silent_noop: two `_run_in_new_loop()` calls produce distinct ApplicationCache instances, second wraps real `ApplicationRedisCacheBackend` (not `NoopApplicationCacheBackend`); metric counter incremented to 1; WARN log captured. - test_application_cache_does_not_increment_loop_switch_metric_on_first_build: first-ever build does not bump the loop-switch counter. - test_application_cache_repeat_call_in_same_loop_is_singleton_no_metric_bump: same-loop repeat returns identical singleton, no metric bump. - test_application_cache_falls_back_to_noop_when_redis_unreachable_on_new_loop: if Redis genuinely fails on the new loop, rebuild attempts the real client, falls back to Noop, but metric still records the loop switch (operators see the rebuild attempt + the Redis failure WARN that was already there). ## Production-readiness 三类 layer (per §K.11.4) - must-be-real: real WARN log + real metric counter + real Redis client rebuild on cross-loop call (no silent zero-cache) - may-be-gated: first cross-loop request may pay rebuild cost (re-ping Redis) — observable via metric, not silent - fully-resolves: huangheng PR #1734 sync point 7 + feedback_announce_equals_landed.md narrative-truth lock ## simple-stable 4 guardrail 1. 不无限扩范围 ✅ — single-function fix in application_runtime.py, reuses existing `application_cache_metrics` infra, no new public API 2. 功能做实 ✅ — real cache rebuild not a placeholder 3. 简单稳定 ✅ — fewer paths than before (one rebuild path replaces the silent-Noop branch), each branch logged 4. 私有化免维护 ✅ — operator can grep `loop_switch_rebuild` in metrics + log "rebuilding for new event loop" to diagnose worker setup issues ## Local gates - 43/43 cache unit tests pass (39 existing + 4 new) - ruff check + ruff format clean Co-Authored-By: Claude Opus 4.7 --- aperag/cache/application_runtime.py | 32 +++- .../unit_test/cache/test_application_cache.py | 173 ++++++++++++++++++ 2 files changed, 200 insertions(+), 5 deletions(-) diff --git a/aperag/cache/application_runtime.py b/aperag/cache/application_runtime.py index cf91a22ee..34340a928 100644 --- a/aperag/cache/application_runtime.py +++ b/aperag/cache/application_runtime.py @@ -43,17 +43,39 @@ def application_cache_policy(namespace: str, *, enabled: bool | None = None) -> async def get_application_cache() -> ApplicationCache: + """Return the singleton :class:`ApplicationCache` for the running + event loop, lazily building it if needed. + + Wave 6 #38 (per `feedback_announce_equals_landed.md` narrative-truth + invariant): when the running event loop changes (test process + starts a new loop, worker process reinitialises, etc.) the cached + instance — whose underlying ``async_redis.Redis`` client is bound + to the prior loop — is no longer usable. Pre-Wave-6 we silently + swapped it for a :class:`NoopApplicationCacheBackend`, which meant + callers paid LiteLLM/embedding cost on every request from then on + with no signal that caching had degraded. + + Wave 6 fix: rebuild the cache on the new loop (re-establish the + real Redis client) and emit a WARN log + bump + ``application_cache_metrics["application_runtime"]["loop_switch_rebuild"]`` + so operators can observe loop-switch frequency. Rebuild can fail + (Redis unreachable on the new loop) — the fallback is still a + Noop backend, but only when Redis is actually broken, not when + the loop merely changed. + """ global _async_cache, _async_cache_loop loop = asyncio.get_running_loop() if _async_cache is not None and _async_cache_loop is loop: return _async_cache if _async_cache is not None and _async_cache_loop is not loop: - _async_cache = ApplicationCache( - backend=NoopApplicationCacheBackend(), - default_policy=application_cache_policy("default", enabled=False), + logger.warning( + "Application cache rebuilding for new event loop: prior cache was bound " + "to a different loop and its Redis client cannot be reused. " + "Frequent loop switches indicate a worker / test setup issue." ) - _async_cache_loop = loop - return _async_cache + application_cache_metrics.increment("application_runtime", "loop_switch_rebuild") + _async_cache = None + _async_cache_loop = None if not settings.cache_enabled: _async_cache = ApplicationCache( diff --git a/tests/unit_test/cache/test_application_cache.py b/tests/unit_test/cache/test_application_cache.py index 74222fe77..efcbfb265 100644 --- a/tests/unit_test/cache/test_application_cache.py +++ b/tests/unit_test/cache/test_application_cache.py @@ -179,3 +179,176 @@ async def ping(self): assert from_url.call_args.args[0] == "redis://cache.example/2" shared_client.assert_not_called() application_runtime.reset_application_cache_for_tests() + + +# --------------------------------------------------------------------- +# Wave 6 #38 — Application cache cross-loop lazy-rebuild + WARN log +# (per spec §K.11.1 row 38 + huangheng PR #1734 sync point 7). +# --------------------------------------------------------------------- + + +def _run_in_new_loop(coro_factory): + """Drive ``coro_factory()`` to completion on a fresh event loop and + close it. Each call gives the cache module a distinct + ``asyncio.AbstractEventLoop`` identity so the cross-loop branch is + exercised.""" + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro_factory()) + finally: + loop.close() + + +def test_application_cache_rebuilds_on_loop_switch_instead_of_silent_noop(monkeypatch, caplog): + """The pre-Wave-6 implementation silently swapped the cache for a + Noop backend whenever the running event loop changed. That meant + a worker that switched loops (test process, asyncio reinit, etc.) + paid full LiteLLM/embedding cost from then on with no log. Wave 6 + #38: detect the loop switch, emit a WARN, bump a metric counter, + and rebuild against the real Redis backend on the new loop.""" + from aperag.cache import application_runtime + from aperag.cache.application import application_cache_metrics + + application_runtime.reset_application_cache_for_tests() + monkeypatch.setattr(application_runtime.settings, "cache_enabled", True) + monkeypatch.setattr(application_runtime.settings, "cache_redis_url", "redis://cache.example/2") + + class FakeRedisClient: + async def ping(self): + return True + + fake_clients: list[FakeRedisClient] = [] + + def _from_url(*args, **kwargs): + client = FakeRedisClient() + fake_clients.append(client) + return client + + with patch.object(application_runtime.async_redis.Redis, "from_url", side_effect=_from_url): + first = _run_in_new_loop(application_runtime.get_application_cache) + # First call on a fresh loop should never count as a switch. + assert application_cache_metrics.snapshot().get("application_runtime", {}).get("loop_switch_rebuild", 0) == 0 + + with caplog.at_level("WARNING"): + second = _run_in_new_loop(application_runtime.get_application_cache) + + # Two distinct loops → two distinct cache instances (rebuilt, not + # the same singleton handed back). + assert second is not first + # And critically, NOT a Noop fallback — the rebuilt cache wraps the + # real Redis backend on the new loop. + assert isinstance(second._backend, application_runtime.ApplicationRedisCacheBackend), ( + "Wave 6 #38 invariant: cross-loop rebuild must produce a real Redis-backed cache, " + "not a silent Noop fallback (per feedback_announce_equals_landed.md narrative-truth lock)" + ) + # Metric counter records the loop switch event for operator + # observability. + counts = application_cache_metrics.snapshot()["application_runtime"] + assert counts["loop_switch_rebuild"] == 1 + # WARN log emitted with the operator-actionable phrasing. + assert any( + "rebuilding for new event loop" in record.getMessage() + for record in caplog.records + if record.levelname == "WARNING" + ) + + application_runtime.reset_application_cache_for_tests() + + +def test_application_cache_does_not_increment_loop_switch_metric_on_first_build(monkeypatch): + """First-ever call (no prior cache) must NOT count as a loop switch + — the counter should only record genuine rebuilds caused by an + event-loop change.""" + from aperag.cache import application_runtime + from aperag.cache.application import application_cache_metrics + + application_runtime.reset_application_cache_for_tests() + monkeypatch.setattr(application_runtime.settings, "cache_enabled", True) + monkeypatch.setattr(application_runtime.settings, "cache_redis_url", "redis://cache.example/2") + + class FakeRedisClient: + async def ping(self): + return True + + with patch.object(application_runtime.async_redis.Redis, "from_url", return_value=FakeRedisClient()): + _run_in_new_loop(application_runtime.get_application_cache) + + snapshot = application_cache_metrics.snapshot() + # The counter key may be entirely absent on first build — that's + # the right behaviour. Don't even materialise the namespace. + assert "loop_switch_rebuild" not in snapshot.get("application_runtime", {}) + + application_runtime.reset_application_cache_for_tests() + + +def test_application_cache_repeat_call_in_same_loop_is_singleton_no_metric_bump(monkeypatch): + """Repeated calls on the same loop must reuse the singleton cache + and never bump the loop_switch counter.""" + from aperag.cache import application_runtime + from aperag.cache.application import application_cache_metrics + + application_runtime.reset_application_cache_for_tests() + monkeypatch.setattr(application_runtime.settings, "cache_enabled", True) + monkeypatch.setattr(application_runtime.settings, "cache_redis_url", "redis://cache.example/2") + + class FakeRedisClient: + async def ping(self): + return True + + async def _double_get(): + first = await application_runtime.get_application_cache() + second = await application_runtime.get_application_cache() + return first, second + + with patch.object(application_runtime.async_redis.Redis, "from_url", return_value=FakeRedisClient()): + first, second = _run_in_new_loop(_double_get) + + assert first is second # Singleton on the same loop + snapshot = application_cache_metrics.snapshot() + assert "loop_switch_rebuild" not in snapshot.get("application_runtime", {}) + + application_runtime.reset_application_cache_for_tests() + + +def test_application_cache_falls_back_to_noop_when_redis_unreachable_on_new_loop(monkeypatch): + """If the real Redis cannot be reached on the new loop, the rebuild + legitimately falls back to a Noop backend (existing behaviour for + Redis-unavailable). Pre-Wave-6 the cross-loop branch silently used + Noop *without* trying Redis; Wave 6 only falls back on a real + Redis failure, and the existing Redis-unavailable WARN still fires.""" + from aperag.cache import application_runtime + from aperag.cache.application import application_cache_metrics + + application_runtime.reset_application_cache_for_tests() + monkeypatch.setattr(application_runtime.settings, "cache_enabled", True) + monkeypatch.setattr(application_runtime.settings, "cache_redis_url", "redis://cache.example/2") + + class GoodRedis: + async def ping(self): + return True + + class BadRedis: + async def ping(self): + raise RuntimeError("redis down on new loop") + + sequence = [GoodRedis(), BadRedis()] + + def _from_url(*args, **kwargs): + return sequence.pop(0) + + with patch.object(application_runtime.async_redis.Redis, "from_url", side_effect=_from_url): + first = _run_in_new_loop(application_runtime.get_application_cache) + second = _run_in_new_loop(application_runtime.get_application_cache) + + # First loop: real backend. + assert isinstance(first._backend, application_runtime.ApplicationRedisCacheBackend) + # Second loop: rebuild attempted real Redis but it failed → Noop. + # Crucially this is a *real* failure path, not the silent + # cross-loop downgrade we removed in Wave 6 #38. + assert isinstance(second._backend, application_runtime.NoopApplicationCacheBackend) + # The loop-switch metric still bumps (we did try to rebuild — + # just landed on Noop because the new loop's Redis was down). + counts = application_cache_metrics.snapshot()["application_runtime"] + assert counts["loop_switch_rebuild"] == 1 + + application_runtime.reset_application_cache_for_tests()